Code
graph LR
A[ERDDAP] --> B[CSV on Google Drive]
B --> C[DuckDB Wrangling]
C --> D[Load Prior ichthyo Tables]
D --> E[Cross-Reference Tows/Nets]
E --> F[Parquet Export]
F --> G[GCS Archive]Source: NOAA CoastWatch ERDDAP
Counts and sizes of invertebrates (primarily squids) captured in CalCOFI ichthyoplankton nets, processed by SWFSC. The two ERDDAP datasets share the same sampling metadata columns; invcnt adds count/abundance fields, invsiz adds a size measurement field.
swfscdevtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
librarian::shelf(
CalCOFI / calcofi4db,
CalCOFI / calcofi4r,
DBI,
dplyr,
DT,
fs,
glue,
here,
janitor,
jsonlite,
readr,
rerddap,
sf,
stringr,
tibble,
tidyr,
quiet = T
)
options(readr.show_col_types = F)
options(DT.options = list(scrollX = TRUE))
# define paths
dataset_name <- "SWFSC CalCOFI Invertebrates"
provider <- "swfsc"
dataset <- "inverts"
dir_label <- glue("{provider}_{dataset}")
dir_data <- "~/My Drive/projects/calcofi/data-public"
dir_csv <- path_expand(glue("{dir_data}/{provider}/{dataset}"))
overwrite <- TRUE
dir_parquet <- here(glue("data/parquet/{dir_label}"))
db_path <- here(glue("data/wrangling/{dir_label}.duckdb"))
if (overwrite) {
if (file_exists(db_path)) {
file_delete(db_path)
}
if (dir_exists(dir_parquet)) dir_delete(dir_parquet)
}
dir_create(dirname(db_path))
con <- get_duckdb_con(db_path)
load_duckdb_extension(con, "spatial")Fetch both datasets and save as CSVs in Google Drive. Skips download if CSV already exists (delete to re-download).
erddap_url <- "https://coastwatch.pfeg.noaa.gov/erddap"
dir_create(dir_csv)
erddap_datasets <- list(
erdCalCOFIinvcnt = "erdCalCOFIinvcnt",
erdCalCOFIinvsiz = "erdCalCOFIinvsiz"
)
for (ds_name in names(erddap_datasets)) {
csv_path <- path(dir_csv, glue("{ds_name}.csv"))
if (!file_exists(csv_path)) {
message(glue("downloading {ds_name} from ERDDAP..."))
ds_info <- rerddap::info(erddap_datasets[[ds_name]], url = erddap_url)
d_raw <- rerddap::tabledap(ds_info)
write_csv(as_tibble(d_raw), csv_path)
message(glue(" wrote {nrow(d_raw)} rows to {csv_path}"))
} else {
message(glue(" {ds_name}: CSV exists, skipping download"))
}
}d <- read_csv_files(
provider = provider,
dataset = dataset,
dir_data = dir_data,
sync_archive = TRUE,
metadata_dir = here("metadata")
)
message(glue("loaded {nrow(d$source_files)} tables from {d$paths$dir_csv}"))
message(glue("total rows: {sum(d$source_files$nrow)}"))# type_exceptions = "all": ERDDAP type names (String, int, float) don't
# match R column classes (character, integer, numeric) — accept all
integrity <- check_data_integrity(
d = d,
dataset_name = dataset_name,
halt_on_fail = TRUE,
type_exceptions = "all"
)
render_integrity_message(integrity)45 type mismatch(es) were found but accepted as known exceptions (e.g., readr infers types differently from redefinition metadata; resolved during ingestion via flds_redefine.csv type_new column).
show_source_files(d)show_tables_redefine(d)show_fields_redefine(d)tbl_stats <- ingest_dataset(
con = con,
d = d,
mode = if (overwrite) "replace" else "append",
verbose = TRUE
)
tbl_stats |>
datatable(rownames = FALSE, filter = "top")d_dataset <- read_csv(here("metadata/dataset.csv"))
dbWriteTable(con, "dataset", d_dataset, overwrite = TRUE)Load reference tables from ichthyo ingest for cross-referencing.
prior <- load_prior_tables(
parquet_dir = here("data/parquet/swfsc_ichthyo"),
con = con,
tables = c("ship", "cruise", "site", "tow", "net", "grid")
)
message(glue(
"loaded {nrow(prior)} prior tables: {paste(prior$table, collapse=', ')}"
))
prior |>
datatable(caption = "Prior tables loaded from ichthyo ingest")for (tbl in c("invert_count", "invert_size")) {
cat(glue("\n### {tbl}\n\n"))
dbGetQuery(con, glue("SELECT * FROM {tbl} LIMIT 100")) |>
datatable(
caption = glue("{tbl} (first 100 rows)"),
rownames = FALSE,
filter = "top"
) |>
print()
}### invert_count
### invert_size
validation <- validate_for_release(con)
if (validation$passed) {
message("validation passed!")
if (nrow(validation$checks) > 0) {
validation$checks |>
filter(status != "pass") |>
datatable(caption = "Validation Warnings")
}
} else {
cat("validation FAILED:\n")
cat(paste("-", validation$errors, collapse = "\n"))
}validation FAILED:
- Table 'site' has 2084 NULL values in required column 'grid_key'
type_changes <- enforce_column_types(
con = con,
d_flds_rd = d$d_flds_rd
)
if (nrow(type_changes) > 0) {
type_changes |>
datatable(caption = "Column type changes applied")
}# collect mismatches for manifest (inverts inherit ship from ichthyo tow/net)
mismatches <- list(
ships = collect_ship_mismatches(con, "invert_count"),
cruise_keys = collect_cruise_key_mismatches(con, "invert_count"))
parquet_stats <- write_parquet_outputs(
con = con,
output_dir = dir_parquet,
tables = dbListTables(con),
strip_provenance = FALSE,
mismatches = mismatches
)
parquet_stats |>
mutate(file = basename(path)) |>
select(-path) |>
datatable(caption = "Parquet export statistics")metadata_path <- build_metadata_json(
con = con,
d_tbls_rd = d$d_tbls_rd,
d_flds_rd = d$d_flds_rd,
output_dir = dir_parquet,
tables = dbListTables(con),
set_comments = TRUE,
provider = provider,
dataset = dataset,
workflow_url = glue(
"https://calcofi.io/workflows/ingest_swfsc_inverts.html"
)
)
# write relationships.json (same schema as ichthyo)
ichthyo_rels <- list(
primary_keys = list(
cruise = "cruise_key",
ship = "ship_key",
site = "site_uuid",
tow = "tow_uuid",
net = "net_uuid",
species = "species_id",
ichthyo = "ichthyo_uuid",
lookup = "lookup_id"),
foreign_keys = list(
list(table = "ichthyo", column = "net_uuid", ref_table = "net", ref_column = "net_uuid"),
list(table = "ichthyo", column = "species_id", ref_table = "species", ref_column = "species_id"),
list(table = "net", column = "tow_uuid", ref_table = "tow", ref_column = "tow_uuid"),
list(table = "tow", column = "site_uuid", ref_table = "site", ref_column = "site_uuid"),
list(table = "site", column = "cruise_key", ref_table = "cruise", ref_column = "cruise_key"),
list(table = "cruise", column = "ship_key", ref_table = "ship", ref_column = "ship_key")))
build_relationships_json(
rels = ichthyo_rels,
output_dir = dir_parquet,
provider = provider,
dataset = dataset
)[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/swfsc_inverts/relationships.json"
gcs_ingest_prefix <- glue("ingest/{dir_label}")
gcs_bucket <- "calcofi-db"
sync_results <- sync_to_gcs(
local_dir = dir_parquet,
gcs_prefix = gcs_ingest_prefix,
bucket = gcs_bucket
)close_duckdb(con)
message("local wrangling database connection closed")
message(glue("parquet outputs written to: {dir_parquet}"))
message(glue("GCS outputs at: gs://{gcs_bucket}/{gcs_ingest_prefix}/"))