---
title: "Ingest NOAA CalCOFI Database"
execute:
echo: true
warning: false
editor_options:
chunk_output_type: console
format:
html:
code-fold: true
---
## Overview {-}
**Goal**: Generate the database from source files with workflow scripts to make
updating easier and provenance fully transparent. This allows us to:
- Rename tables and column names, control data types and use Unicode encoding
for a consistent database ingestion strategy, per Database naming conventions in [ Database – CalCOFI.io Docs ](https://calcofi.io/docs/db.html) .
- Differentiate between raw and derived or updated tables and columns. For
instance, the taxonomy for any given species can change over time, such as
lumping or splitting of a given taxa, and by taxonomic authority (e.g., WoRMS,
ITIS or GBIF). These taxonomic identifiers and the full taxonomic hierarchy
should get regularly updated regardless of source observational data, and can
either be updated in the table directly or joined one-to-one with a seperate
table in a materialized view (so as not to slow down queries with a regular view).
This workflow processes NOAA CalCOFI database CSV files and updates the PostgreSQL database. The workflow:
1. Reads CSV files from source directory
2. Compares with lookup tables for field descriptions
3. Initializes or updates database tables
4. Generates summary statistics
```{mermaid}
%%| label: overview
%%| fig-cap: "Overview diagram of CSV ingestion process into the database."
%%| file: diagrams/ingest_noaa-calcofi-db_overview.mmd
```
```{r}
#| label: setup
librarian:: shelf (
DBI, DT, glue, here, janitor, lubridate, purrr, readr, rlang, tibble, tidyr, uuid,
quiet = T)
options (readr.show_col_types = F)
# Source database connection
source (here ("../apps_dev/libs/db.R" ))
# define paths ---
# dataset with external data folder
dir_data <- "/Users/bbest/My Drive/projects/calcofi/data"
dataset <- "noaa-calcofi-db"
dir_csv <- glue ("{dir_data}/{dataset}" )
# input tables (version controlled)
tbls_in_csv <- here (glue ("data/ingest/{dataset}/tbls_in.csv" ))
flds_in_csv <- here (glue ("data/ingest/{dataset}/flds_in.csv" ))
# rename tables (version controlled)
tbls_rn_csv <- here (glue ("data/ingest/{dataset}/tbls_rename.csv" ))
flds_rn_csv <- here (glue ("data/ingest/{dataset}/flds_rename.csv" ))
```
## Read CSV Files
```{r}
#| label: read_csvs
# read data, extract field headers
d <- tibble (
csv = list.files (
dir_csv, pattern = " \\ .csv$" , full.names = TRUE )) |>
mutate (
tbl = tools:: file_path_sans_ext (basename (csv)),
data = map (csv, read_csv),
nrow = map_int (data, nrow),
ncol = map_int (data, ncol),
flds = map2 (tbl, data, \(tbl, data){
tibble (
fld = names (data),
type = map_chr (fld, \(fld) class (data[[fld]])[1 ] )) })) |>
relocate (tbl)
if (! dir.exists (dirname (tbls_in_csv)))
dir.create (dirname (tbls_in_csv), recursive = T)
d_tbls_in <- d |>
select (tbl, nrow, ncol)
write_csv (d_tbls_in, tbls_in_csv)
d_flds_in <- d |>
select (tbl, flds) |>
unnest (flds)
write_csv (d_flds_in, flds_in_csv)
datatable (d_tbls_in, caption = "Tables to ingest." )
```
```{r}
#| label: dt_flds_in
datatable (d_flds_in, caption = "Fields to ingest." )
```
## Show table and field renames
```{r}
#| label: create_or_read_flds_rn_csv
if (! file.exists (flds_rn_csv)){
d_tbls_in |>
select (
tbl_old = tbl,
tbl_new = tbl) |>
write_csv (tbls_rn_csv)
d_flds_in |>
group_by (tbl) |>
mutate (
fld_new = make_clean_names (fld),
order = 1 : n (),
comment = "" ,
notes = "" ,
mutation = "" ,
type_new = map2_chr (tbl, fld, \(x, y){
v <- d |>
filter (
tbl == x) |>
pull (data) |>
pluck (1 ) |>
pull (y)
cl <- class (v)[1 ]
if (cl == "character" ){
if (all (! is.na (as.UUID (v))))
return ("uuid" )
} else {
return ("varchar" )
}
# integer
if (cl == "numeric" && all (v%% 1 == 0 )){
# https://www.postgresql.org/docs/current/datatype-numeric.html
if (
min (v) >= - 32768 &
max (v) <= 32767 )
return ("smallint" )
if (
min (v) >= - 2147483648 &
max (v) <= 2147483647 )
return ("integer" )
if (
min (v) >= - 9223372036854775808 &
max (v) <= 9223372036854775807 )
return ("bigint" )
}
if (cl == "POSIXct" )
return ("timestamp" )
if (cl == "Date" )
return ("date" )
return (cl)
})) |>
select (
tbl_old = tbl, tbl_new = tbl,
fld_old = fld, fld_new,
order_old = order, order_new = order,
type_old = type, type_new,
comment, notes, mutation) |>
write_csv (flds_rn_csv)
stop (glue (
"Please update the table and field rename tables before proceeding:
{tbls_rn_csv}
{flds_rn_csv}" ))
}
d_tbls_rn <- read_csv (tbls_rn_csv)
d_flds_rn <- read_csv (flds_rn_csv)
d_tbls_rn |>
mutate (
is_equal = tbl_old == tbl_new) |>
datatable (
caption = "Tables to rename." ,
options = list (
columnDefs = list (list (
targets = "is_equal" , visible = F)))) |>
formatStyle (
"tbl_new" ,
backgroundColor = styleEqual (
c (T,F),
c ("lightgray" ,"lightgreen" )),
valueColumns = "is_equal" )
```
```{r}
#| label: dt_flds_rn
d_flds_rn |>
mutate (
tbl_is_equal = tbl_old == tbl_new,
fld_is_equal = fld_old == fld_new,
type_is_equal = type_old == type_new,
order_is_equal = order_old == order_new,
tbl = ifelse (
tbl_is_equal,
tbl_old,
glue ("{tbl_old} → {tbl_new}" )),
fld = ifelse (
fld_is_equal,
fld_old,
glue ("{fld_old} → {fld_new}" )),
type = ifelse (
type_is_equal,
type_old,
glue ("{type_old} → {type_new}" )),
order = ifelse (
order_is_equal,
order_old,
glue ("{order_old} → {order_new}" ))) |>
select (
- tbl_old, - tbl_new,
- fld_old, - fld_new,
- type_old, - type_new,
- order_old, - order_new) |>
relocate (tbl, fld, type, order) |>
datatable (
caption = "Fields to rename." ,
rownames = F,
options = list (
colReorder = T,
rowGroup = list (dataSrc = 0 ),
pageLength = 50 ,
columnDefs = list (list (
targets = c (
"tbl" ,
"tbl_is_equal" , "fld_is_equal" ,
"type_is_equal" , "order_is_equal" ),
visible = F))),
extensions = c ("ColReorder" , "RowGroup" , "Responsive" )) |>
formatStyle (
"fld" ,
backgroundColor = styleEqual (
c (T, F),
c ("lightgray" ,"lightgreen" )),
valueColumns = "fld_is_equal" ) |>
formatStyle (
"type" ,
backgroundColor = styleEqual (
c (T, F),
c ("lightgray" ,"lightgreen" )),
valueColumns = "type_is_equal" ) |>
formatStyle (
"order" ,
backgroundColor = styleEqual (
c (T, F),
c ("lightgray" ,"lightgreen" )),
valueColumns = "order_is_equal" )
```
## Apply remappings to data
```{r}
#| label: make_data_new
d0 <- d
mutate_table <- function (tbl, data) {
# message(glue("Processing table: {tbl}"))
d_f <- d_flds_rn |>
filter (tbl_old == tbl)
# rename fields ----
f_rn <- d_f |>
select (fld_old, fld_new) |>
deframe ()
y <- rename_with (data, ~ f_rn[.x])
# mutate fields ----
d_m <- d_f |>
select (fld_new, mutation) |>
filter (! is.na (mutation))
for (i in seq_len (nrow (d_m))) {
fld <- d_m$ fld_new[i]
mx <- d_m$ mutation[i]
# message(glue("mutating {tbl}.{fld}: `{mx}`"))
fld_sym <- rlang:: sym (fld)
mx_expr <- rlang:: parse_expr (mx)
y <- y |>
mutate (!! fld_sym := eval (mx_expr, envir = y))
}
# order fields ----
flds_ordered <- d_f |>
arrange (order_new) |>
pull (fld_new)
y <- y |>
relocate (all_of (flds_ordered))
return (y)
}
d <- d |>
left_join (
d_tbls_rn,
by = c ("tbl" = "tbl_old" )) |>
mutate (
data_new = map2 (tbl, data, mutate_table))
```
## Load Tables into Database
```{r}
#| label: db-operations
schema <- "dev"
overwrite <- T
tbl_to_db <- function (tbl){ # tbl = "eggs" # tbl = "ship"
message (glue ("{schema}.{tbl} ~ {Sys.time()}" ))
# Check if table exists
tbl_exists <- dbExistsTable (
con, Id (schema = schema, table = tbl))
d_tbl <- d |>
filter (tbl_new == !! tbl) |>
pull (data_new) |>
pluck (1 )
v_fld_types <- d_flds_rn |>
filter (tbl_new == !! tbl) |>
arrange (order_new) |>
select (fld_new, type_new) |>
deframe ()
if (! tbl_exists | overwrite) {
message (" loading table" )
dbWriteTable (
con,
Id (schema = schema, table = tbl),
d_tbl,
field.types = v_fld_types,
append = F,
overwrite = T)
} else {
message (" exists, skipping" )
# # Get existing data
# existing_data <- dbGetQuery(
# con, sprintf("SELECT * FROM %s LIMIT 1", table_name))
#
# # Compare schemas
# new_cols <- setdiff(names(data), names(existing_data))
# if (length(new_cols) > 0) {
# cat(sprintf("New columns found in %s: %s\n", table_name, paste(new_cols, collapse = ", ")))
# }
#
# # Update data - this is a simple example, you might want more sophisticated merge logic
# dbWriteTable(con, table_name, data, overwrite = TRUE)
}
# Return summary stats
return (tibble (
tbl = tbl,
nrow = nrow (d_tbl),
ncol = ncol (d_tbl),
dtime_ingested = Sys.time ()))
}
# Process each table
tbl_stats <- map (d$ tbl_new, tbl_to_db) |>
bind_rows ()
# Display summary statistics
tbl_stats |>
datatable (rownames = FALSE , filter = "top" )
```
## Data Quality Checks
```{r}
#| label: quality-checks
#| eval: false
# Perform basic data quality checks
check_data_quality <- function (table_name) {
# Get row count
row_count <- dbGetQuery (
con,
sprintf ("SELECT COUNT(*) as count FROM %s" , table_name)
)$ count
# Get null counts per column
null_counts <- dbGetQuery (
con,
sprintf ("
SELECT column_name,
COUNT(*) - COUNT(column_value) as null_count,
ROUND(100.0 * (COUNT(*) - COUNT(column_value)) / COUNT(*), 2) as null_percentage
FROM %s
CROSS JOIN LATERAL (
SELECT json_object_keys(to_json(%s)) as column_name
) cols
CROSS JOIN LATERAL (
SELECT %s.%I as column_value
) vals
GROUP BY column_name
HAVING COUNT(*) - COUNT(column_value) > 0
ORDER BY null_count DESC" ,
table_name, table_name, table_name, column_name
)
)
return (list (
row_count = row_count,
null_analysis = null_counts
))
}
# Run quality checks for each table
quality_results <- map (names (processed_data), check_data_quality)
names (quality_results) <- names (processed_data)
# Display quality check results
for (name in names (quality_results)) {
cat (sprintf (" \n Quality Check Results for %s: \n " , name))
cat (sprintf ("Total Rows: %d \n " , quality_results[[name]]$ row_count))
if (nrow (quality_results[[name]]$ null_analysis) > 0 ) {
cat ("Columns with NULL values: \n " )
print (quality_results[[name]]$ null_analysis)
} else {
cat ("No NULL values found \n " )
}
}
```
## Cleanup
```{r}
#| label: cleanup
# Close database connection
dbDisconnect (con)
```
## TODO
- [ ] Check all `type_new` are valid for db connection.
- [ ] Check that all `flds_in` are in `flds_rn` .
- [ ] Insert table and field SQL `COMMENT` s into database, using `comment` field + `workflow_md` (markdown with name and link) + source with tbl.fld
- [ ] Review documentation for more comment descriptions at [ Data > Data Formats | CalCOFI.org ](https://calcofi.com/index.php?option=com_content&view=category&id=73&Itemid=993)