Ingest NOAA CalCOFI Database

Overview

Goal: Generate the database from source files with workflow scripts to make updating easier and provenance fully transparent. This allows us to:

  • Rename tables and column names, control data types and use Unicode encoding for a consistent database ingestion strategy, per Database naming conventions in Database – CalCOFI.io Docs.

  • Differentiate between raw and derived or updated tables and columns. For instance, the taxonomy for any given species can change over time, such as lumping or splitting of a given taxa, and by taxonomic authority (e.g., WoRMS, ITIS or GBIF). These taxonomic identifiers and the full taxonomic hierarchy should get regularly updated regardless of source observational data, and can either be updated in the table directly or joined one-to-one with a seperate table in a materialized view (so as not to slow down queries with a regular view).

This workflow processes NOAA CalCOFI database CSV files and updates the PostgreSQL database. The workflow:

  1. Reads CSV files from source directory
  2. Compares with lookup tables for field descriptions
  3. Initializes or updates database tables
  4. Generates summary statistics
Code
graph TD
    A[Read CSV Files] --> B[Extract Column Names]
    B --> C[Compare with Lookups]
    C --> D[Process Field Descriptions]
    D --> E{Table Exists?}
    E -->|Yes| F[Update Table]
    E -->|No| G[Initialize Table]
    F --> H[Summary Stats]
    G --> H
graph TD
    A[Read CSV Files] --> B[Extract Column Names]
    B --> C[Compare with Lookups]
    C --> D[Process Field Descriptions]
    D --> E{Table Exists?}
    E -->|Yes| F[Update Table]
    E -->|No| G[Initialize Table]
    F --> H[Summary Stats]
    G --> H

Overview diagram of CSV ingestion process into the database.

Code
librarian::shelf(
  DBI, DT, glue, here, janitor, lubridate, purrr, readr, rlang, tibble, tidyr, uuid,
  quiet = T)
options(readr.show_col_types = F)

# Source database connection
source(here("../apps_dev/libs/db.R"))

# define paths ---

# dataset with external data folder
dir_data <- "/Users/bbest/My Drive/projects/calcofi/data"
dataset  <- "noaa-calcofi-db"
dir_csv  <- glue("{dir_data}/{dataset}")
# input tables (version controlled)
tbls_in_csv <- here(glue("data/ingest/{dataset}/tbls_in.csv"))
flds_in_csv <- here(glue("data/ingest/{dataset}/flds_in.csv"))
# rename tables (version controlled)
tbls_rn_csv <- here(glue("data/ingest/{dataset}/tbls_rename.csv"))
flds_rn_csv <- here(glue("data/ingest/{dataset}/flds_rename.csv"))

1 Read CSV Files

Code
# read data, extract field headers
d <- tibble(
  csv = list.files(
    dir_csv, pattern = "\\.csv$", full.names = TRUE)) |> 
  mutate(
    tbl  = tools::file_path_sans_ext(basename(csv)),
    data = map(csv, read_csv),
    nrow = map_int(data, nrow),
    ncol = map_int(data, ncol),
    flds = map2(tbl, data, \(tbl, data){
      tibble(
        fld  = names(data),
        type = map_chr(fld, \(fld) class(data[[fld]])[1] )) })) |> 
  relocate(tbl)

if (!dir.exists(dirname(tbls_in_csv)))
  dir.create(dirname(tbls_in_csv), recursive = T)

d_tbls_in <- d |> 
  select(tbl, nrow, ncol)
write_csv(d_tbls_in, tbls_in_csv)

d_flds_in <- d |> 
  select(tbl, flds) |> 
  unnest(flds) 
write_csv(d_flds_in, flds_in_csv)

datatable(d_tbls_in, caption = "Tables to ingest.")
Code
datatable(d_flds_in, caption = "Fields to ingest.")

2 Show table and field renames

Code
if (!file.exists(flds_rn_csv)){
  d_tbls_in |> 
    select(
      tbl_old = tbl, 
      tbl_new = tbl) |> 
    write_csv(tbls_rn_csv)

  d_flds_in |> 
    group_by(tbl) |> 
    mutate(
      fld_new  = make_clean_names(fld),
      order    = 1:n(),
      comment  = "",
      notes    = "",
      mutation = "",
      type_new = map2_chr(tbl, fld, \(x, y){
        
        v <- d |> 
          filter(
            tbl == x) |> 
          pull(data) |> 
          pluck(1) |> 
          pull(y)
        
        cl <- class(v)[1]
        
        if (cl == "character"){
          if (all(!is.na(as.UUID(v))))
            return("uuid")
        } else {
          return("varchar")
        }
        
        # integer
        if (cl == "numeric" && all(v%%1==0)){
          # https://www.postgresql.org/docs/current/datatype-numeric.html
          if (
            min(v) >= -32768 & 
            max(v) <= 32767)
            return("smallint")
          if (
            min(v) >= -2147483648 & 
            max(v) <= 2147483647)
            return("integer")
          if (
            min(v) >= -9223372036854775808 & 
            max(v) <= 9223372036854775807)
            return("bigint")
        }
        
        if (cl == "POSIXct")
          return("timestamp")
        if (cl == "Date")
          return("date")
        
        return(cl)
        })) |> 
    select(
      tbl_old   = tbl,   tbl_new = tbl,
      fld_old   = fld,   fld_new,
      order_old = order, order_new = order,
      type_old  = type,  type_new,
      comment, notes, mutation) |> 
    write_csv(flds_rn_csv)
  
  stop(glue(
    "Please update the table and field rename tables before proceeding:
      {tbls_rn_csv}
      {flds_rn_csv}"))
}

d_tbls_rn <- read_csv(tbls_rn_csv)
d_flds_rn <- read_csv(flds_rn_csv)

d_tbls_rn |>
  mutate(
    is_equal = tbl_old == tbl_new) |> 
  datatable(
    caption = "Tables to rename.",
    options = list(
      columnDefs = list(list(
        targets = "is_equal", visible = F)))) |> 
  formatStyle(
    "tbl_new",
    backgroundColor = styleEqual(
      c(T,F), 
      c("lightgray","lightgreen")),
    valueColumns    = "is_equal")
Code
d_flds_rn |>
  mutate(
    tbl_is_equal   = tbl_old   == tbl_new,
    fld_is_equal   = fld_old   == fld_new,
    type_is_equal  = type_old  == type_new,
    order_is_equal = order_old == order_new,
    tbl = ifelse(
      tbl_is_equal,
      tbl_old,
      glue("{tbl_old} → {tbl_new}")),
    fld = ifelse(
      fld_is_equal,
      fld_old,
      glue("{fld_old} → {fld_new}")),
    type = ifelse(
      type_is_equal,
      type_old,
      glue("{type_old} → {type_new}")),
    order = ifelse(
      order_is_equal,
      order_old,
      glue("{order_old} → {order_new}"))) |> 
  select(
    -tbl_old,   -tbl_new,
    -fld_old,   -fld_new,
    -type_old,  -type_new,
    -order_old, -order_new) |> 
  relocate(tbl, fld, type, order) |> 
  datatable(
    caption = "Fields to rename.",
    rownames = F,
    options = list(
      colReorder = T,
      rowGroup = list(dataSrc = 0),
      pageLength = 50,
      columnDefs = list(list(
        targets = c(
          "tbl",
          "tbl_is_equal", "fld_is_equal", 
          "type_is_equal", "order_is_equal"), 
        visible = F))),
    extensions = c("ColReorder", "RowGroup", "Responsive")) |> 
  formatStyle(
    "fld",
    backgroundColor = styleEqual(
      c(T, F), 
      c("lightgray","lightgreen")),
    valueColumns    = "fld_is_equal") |> 
  formatStyle(
    "type",
    backgroundColor = styleEqual(
      c(T, F), 
      c("lightgray","lightgreen")),
    valueColumns    = "type_is_equal") |> 
  formatStyle(
    "order",
    backgroundColor = styleEqual(
      c(T, F), 
      c("lightgray","lightgreen")),
    valueColumns    = "order_is_equal")

3 Apply remappings to data

Code
d0 <- d
mutate_table <- function(tbl, data) {
  # message(glue("Processing table: {tbl}"))
  
  d_f <- d_flds_rn |>
    filter(tbl_old == tbl)
  
  # rename fields ----
  f_rn <- d_f |>
    select(fld_old, fld_new) |> 
    deframe()
  
  y <- rename_with(data, ~ f_rn[.x])
  
  # mutate fields ----
  d_m <- d_f |> 
    select(fld_new, mutation) |> 
    filter(!is.na(mutation))
  
  for (i in seq_len(nrow(d_m))) {
    fld <- d_m$fld_new[i]
    mx  <- d_m$mutation[i]
    
    # message(glue("mutating {tbl}.{fld}: `{mx}`"))
    fld_sym <- rlang::sym(fld)
    mx_expr <- rlang::parse_expr(mx)
    
    y <- y |>
      mutate(!!fld_sym := eval(mx_expr, envir = y))
  }
  
  # order fields ----
  flds_ordered <- d_f |>
    arrange(order_new) |> 
    pull(fld_new)
  y <- y |>
    relocate(all_of(flds_ordered))

  return(y)
}

d <- d |>
  left_join(
    d_tbls_rn,
    by = c("tbl" = "tbl_old")) |> 
  mutate(
    data_new = map2(tbl, data, mutate_table))

4 Load Tables into Database

Code
schema    <- "dev"
overwrite <- T

tbl_to_db <- function(tbl){ # tbl = "eggs" # tbl = "ship"
  message(glue("{schema}.{tbl}  ~ {Sys.time()}" ))

  # Check if table exists
  tbl_exists <- dbExistsTable(
    con, Id(schema = schema, table = tbl))
  
  d_tbl <- d |> 
    filter(tbl_new == !!tbl) |> 
    pull(data_new) |> 
    pluck(1)
  
  v_fld_types <- d_flds_rn |> 
    filter(tbl_new == !!tbl) |> 
    arrange(order_new) |> 
    select(fld_new, type_new) |> 
    deframe()
  
  if (!tbl_exists | overwrite) {
    
    message("  loading table")
    dbWriteTable(
      con, 
      Id(schema = schema, table = tbl), 
      d_tbl, 
      field.types = v_fld_types, 
      append      = F,
      overwrite   = T)
    
  } else {
    
    message("  exists, skipping")
    
    # # Get existing data
    # existing_data <- dbGetQuery(
    #   con, sprintf("SELECT * FROM %s LIMIT 1", table_name))
    # 
    # # Compare schemas
    # new_cols <- setdiff(names(data), names(existing_data))
    # if (length(new_cols) > 0) {
    #   cat(sprintf("New columns found in %s: %s\n", table_name, paste(new_cols, collapse = ", ")))
    # }
    # 
    # # Update data - this is a simple example, you might want more sophisticated merge logic
    # dbWriteTable(con, table_name, data, overwrite = TRUE)
  }
  
  # Return summary stats
  return(tibble(
    tbl = tbl,
    nrow = nrow(d_tbl),
    ncol = ncol(d_tbl),
    dtime_ingested = Sys.time()))
}

# Process each table
tbl_stats <- map(d$tbl_new, tbl_to_db) |> 
  bind_rows()

# Display summary statistics
tbl_stats |> 
  datatable(rownames = FALSE, filter = "top")

5 Data Quality Checks

Code
# Perform basic data quality checks
check_data_quality <- function(table_name) {
  # Get row count
  row_count <- dbGetQuery(
    con, 
    sprintf("SELECT COUNT(*) as count FROM %s", table_name)
  )$count
  
  # Get null counts per column
  null_counts <- dbGetQuery(
    con,
    sprintf("
      SELECT column_name, 
             COUNT(*) - COUNT(column_value) as null_count,
             ROUND(100.0 * (COUNT(*) - COUNT(column_value)) / COUNT(*), 2) as null_percentage
      FROM %s
      CROSS JOIN LATERAL (
        SELECT json_object_keys(to_json(%s)) as column_name
      ) cols
      CROSS JOIN LATERAL (
        SELECT %s.%I as column_value
      ) vals
      GROUP BY column_name
      HAVING COUNT(*) - COUNT(column_value) > 0
      ORDER BY null_count DESC",
      table_name, table_name, table_name, column_name
    )
  )
  
  return(list(
    row_count = row_count,
    null_analysis = null_counts
  ))
}

# Run quality checks for each table
quality_results <- map(names(processed_data), check_data_quality)
names(quality_results) <- names(processed_data)

# Display quality check results
for (name in names(quality_results)) {
  cat(sprintf("\nQuality Check Results for %s:\n", name))
  cat(sprintf("Total Rows: %d\n", quality_results[[name]]$row_count))
  
  if (nrow(quality_results[[name]]$null_analysis) > 0) {
    cat("Columns with NULL values:\n")
    print(quality_results[[name]]$null_analysis)
  } else {
    cat("No NULL values found\n")
  }
}

6 Cleanup

Code
# Close database connection
dbDisconnect(con)

7 TODO