Ingest SWFSC Invertebrates

Published

2026-04-02

1 Overview

Source: NOAA CoastWatch ERDDAP

Counts and sizes of invertebrates (primarily squids) captured in CalCOFI ichthyoplankton nets, processed by SWFSC. The two ERDDAP datasets share the same sampling metadata columns; invcnt adds count/abundance fields, invsiz adds a size measurement field.

  • Provider: swfsc
  • Coverage: 1978–2023 (counts), 1980–2023 (sizes)
  • Spatial: Pacific coast, 0–51°N

1.1 Data Flow

Code
graph LR
  A[ERDDAP] --> B[CSV on Google Drive]
  B --> C[DuckDB Wrangling]
  C --> D[Load Prior ichthyo Tables]
  D --> E[Cross-Reference Tows/Nets]
  E --> F[Parquet Export]
  F --> G[GCS Archive]

2 Setup

Code
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))

librarian::shelf(
  CalCOFI / calcofi4db,
  CalCOFI / calcofi4r,
  DBI,
  dplyr,
  DT,
  fs,
  glue,
  here,
  janitor,
  jsonlite,
  readr,
  rerddap,
  sf,
  stringr,
  tibble,
  tidyr,
  quiet = T
)
options(readr.show_col_types = F)
options(DT.options = list(scrollX = TRUE))

# define paths
dataset_name <- "SWFSC CalCOFI Invertebrates"
provider <- "swfsc"
dataset <- "inverts"
dir_label <- glue("{provider}_{dataset}")
dir_data <- "~/My Drive/projects/calcofi/data-public"
dir_csv <- path_expand(glue("{dir_data}/{provider}/{dataset}"))
overwrite <- TRUE
dir_parquet <- here(glue("data/parquet/{dir_label}"))
db_path <- here(glue("data/wrangling/{dir_label}.duckdb"))

if (overwrite) {
  if (file_exists(db_path)) {
    file_delete(db_path)
  }
  if (dir_exists(dir_parquet)) dir_delete(dir_parquet)
}

dir_create(dirname(db_path))
con <- get_duckdb_con(db_path)
load_duckdb_extension(con, "spatial")

3 Download from ERDDAP

Fetch both datasets and save as CSVs in Google Drive. Skips download if CSV already exists (delete to re-download).

Code
erddap_url <- "https://coastwatch.pfeg.noaa.gov/erddap"
dir_create(dir_csv)

erddap_datasets <- list(
  erdCalCOFIinvcnt = "erdCalCOFIinvcnt",
  erdCalCOFIinvsiz = "erdCalCOFIinvsiz"
)

for (ds_name in names(erddap_datasets)) {
  csv_path <- path(dir_csv, glue("{ds_name}.csv"))

  if (!file_exists(csv_path)) {
    message(glue("downloading {ds_name} from ERDDAP..."))
    ds_info <- rerddap::info(erddap_datasets[[ds_name]], url = erddap_url)
    d_raw <- rerddap::tabledap(ds_info)
    write_csv(as_tibble(d_raw), csv_path)
    message(glue("  wrote {nrow(d_raw)} rows to {csv_path}"))
  } else {
    message(glue("  {ds_name}: CSV exists, skipping download"))
  }
}

4 Read CSVs

Code
d <- read_csv_files(
  provider = provider,
  dataset = dataset,
  dir_data = dir_data,
  sync_archive = TRUE,
  metadata_dir = here("metadata")
)

message(glue("loaded {nrow(d$source_files)} tables from {d$paths$dir_csv}"))
message(glue("total rows: {sum(d$source_files$nrow)}"))

5 Check Data Integrity

Code
# type_exceptions = "all": ERDDAP type names (String, int, float) don't
# match R column classes (character, integer, numeric) — accept all
integrity <- check_data_integrity(
  d = d,
  dataset_name = dataset_name,
  halt_on_fail = TRUE,
  type_exceptions = "all"
)

render_integrity_message(integrity)

5.1 ✅ Data Integrity Check Passed: SWFSC CalCOFI Invertebrates

5.1.1 Passed with Accepted Exceptions

45 type mismatch(es) were found but accepted as known exceptions (e.g., readr infers types differently from redefinition metadata; resolved during ingestion via flds_redefine.csv type_new column).


6 Source Files

Code
show_source_files(d)

7 Tables & Fields

Code
show_tables_redefine(d)
Code
show_fields_redefine(d)

8 Load into Database

Code
tbl_stats <- ingest_dataset(
  con = con,
  d = d,
  mode = if (overwrite) "replace" else "append",
  verbose = TRUE
)

tbl_stats |>
  datatable(rownames = FALSE, filter = "top")

9 Load Dataset Metadata

Code
d_dataset <- read_csv(here("metadata/dataset.csv"))
dbWriteTable(con, "dataset", d_dataset, overwrite = TRUE)

10 Load Prior Tables

Load reference tables from ichthyo ingest for cross-referencing.

Code
prior <- load_prior_tables(
  parquet_dir = here("data/parquet/swfsc_ichthyo"),
  con = con,
  tables = c("ship", "cruise", "site", "tow", "net", "grid")
)

message(glue(
  "loaded {nrow(prior)} prior tables: {paste(prior$table, collapse=', ')}"
))

prior |>
  datatable(caption = "Prior tables loaded from ichthyo ingest")

11 Data Preview

Code
for (tbl in c("invert_count", "invert_size")) {
  cat(glue("\n### {tbl}\n\n"))
  dbGetQuery(con, glue("SELECT * FROM {tbl} LIMIT 100")) |>
    datatable(
      caption = glue("{tbl} (first 100 rows)"),
      rownames = FALSE,
      filter = "top"
    ) |>
    print()
}
### invert_count
### invert_size

12 Validate

Code
validation <- validate_for_release(con)

if (validation$passed) {
  message("validation passed!")
  if (nrow(validation$checks) > 0) {
    validation$checks |>
      filter(status != "pass") |>
      datatable(caption = "Validation Warnings")
  }
} else {
  cat("validation FAILED:\n")
  cat(paste("-", validation$errors, collapse = "\n"))
}
validation FAILED:
- Table 'site' has 2084 NULL values in required column 'grid_key'

13 Enforce Column Types

Code
type_changes <- enforce_column_types(
  con = con,
  d_flds_rd = d$d_flds_rd
)

if (nrow(type_changes) > 0) {
  type_changes |>
    datatable(caption = "Column type changes applied")
}

14 Write Parquet

Code
# collect mismatches for manifest (inverts inherit ship from ichthyo tow/net)
mismatches <- list(
  ships       = collect_ship_mismatches(con, "invert_count"),
  cruise_keys = collect_cruise_key_mismatches(con, "invert_count"))

parquet_stats <- write_parquet_outputs(
  con = con,
  output_dir = dir_parquet,
  tables = dbListTables(con),
  strip_provenance = FALSE,
  mismatches = mismatches
)

parquet_stats |>
  mutate(file = basename(path)) |>
  select(-path) |>
  datatable(caption = "Parquet export statistics")

15 Write Metadata

Code
metadata_path <- build_metadata_json(
  con = con,
  d_tbls_rd = d$d_tbls_rd,
  d_flds_rd = d$d_flds_rd,
  output_dir = dir_parquet,
  tables = dbListTables(con),
  set_comments = TRUE,
  provider = provider,
  dataset = dataset,
  workflow_url = glue(
    "https://calcofi.io/workflows/ingest_swfsc_inverts.html"
  )
)

# write relationships.json (same schema as ichthyo)
ichthyo_rels <- list(
  primary_keys = list(
    cruise  = "cruise_key",
    ship    = "ship_key",
    site    = "site_uuid",
    tow     = "tow_uuid",
    net     = "net_uuid",
    species = "species_id",
    ichthyo = "ichthyo_uuid",
    lookup  = "lookup_id"),
  foreign_keys = list(
    list(table = "ichthyo", column = "net_uuid",   ref_table = "net",     ref_column = "net_uuid"),
    list(table = "ichthyo", column = "species_id", ref_table = "species", ref_column = "species_id"),
    list(table = "net",     column = "tow_uuid",   ref_table = "tow",     ref_column = "tow_uuid"),
    list(table = "tow",     column = "site_uuid",  ref_table = "site",    ref_column = "site_uuid"),
    list(table = "site",    column = "cruise_key",  ref_table = "cruise",  ref_column = "cruise_key"),
    list(table = "cruise",  column = "ship_key",    ref_table = "ship",    ref_column = "ship_key")))

build_relationships_json(
  rels       = ichthyo_rels,
  output_dir = dir_parquet,
  provider   = provider,
  dataset    = dataset
)
[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/swfsc_inverts/relationships.json"

16 Upload to GCS

Code
gcs_ingest_prefix <- glue("ingest/{dir_label}")
gcs_bucket <- "calcofi-db"

sync_results <- sync_to_gcs(
  local_dir = dir_parquet,
  gcs_prefix = gcs_ingest_prefix,
  bucket = gcs_bucket
)

17 Cleanup

Code
close_duckdb(con)
message("local wrangling database connection closed")
message(glue("parquet outputs written to: {dir_parquet}"))
message(glue("GCS outputs at: gs://{gcs_bucket}/{gcs_ingest_prefix}/"))