Ingest Underway CUFES Fish Eggs

Published

2026-06-08

1 Overview

Source: NOAA CoastWatch ERDDAP erdCalCOFIcufes — Continuous Underway Fish Egg Sampler egg counts (sardine/anchovy/jack mackerel/ hake/squid/other) + underway environment. Provider swfsc (Noelle Bowlin), 1996-present.

2 Setup

Code
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
librarian::shelf(
  CalCOFI/calcofi4db, CalCOFI/calcofi4r,
  DBI, dplyr, DT, fs, glue, here, jsonlite, knitr, lubridate, purrr,
  readr, sf, stringr, tibble, tidyr, units, quiet = T)
options(readr.show_col_types = F); options(DT.options = list(scrollX = TRUE))
source(here("libs/ingest.R"))

cc <- read_calcofi_meta(here("ingest_swfsc_cufes.qmd"))
provider <- cc$provider; dataset <- cc$dataset; tables_owned <- cc$tables_owned
dir_label <- glue("{provider}_{dataset}")
dir_parquet <- here(glue("data/parquet/{dir_label}"))
db_path <- here(glue("data/wrangling/{dir_label}.duckdb"))
if (overwrite && file_exists(db_path)) file_delete(db_path)
dir_create(dirname(db_path)); con <- get_duckdb_con(db_path); load_duckdb_extension(con, "spatial")
meas_type_csv <- here("metadata/measurement_type.csv"); d_meas_type <- read_csv(meas_type_csv)

3 Download from ERDDAP

Code
dir_dl  <- here(glue("data/cache/{dir_label}")); dir_create(dir_dl)
cufes_csv <- file.path(dir_dl, "erdCalCOFIcufes.csv")
erddap_url <- "https://coastwatch.pfeg.noaa.gov/erddap/tabledap/erdCalCOFIcufes.csv"
if (overwrite || !file_exists(cufes_csv))
  download.file(erddap_url, cufes_csv, quiet = TRUE)
# ERDDAP CSV: row 1 = column names, row 2 = units, rows 3+ = data
hdr   <- read_csv(cufes_csv, n_max = 0) |> names()
d_raw <- read_csv(cufes_csv, skip = 2, col_names = hdr)
cat(glue("Downloaded {format(nrow(d_raw), big.mark=',')} CUFES samples"), "\n")
Downloaded 49,572 CUFES samples 
Code
sync_to_gcs(local_dir = dir_dl, gcs_prefix = glue("archive/{provider}/{dataset}"),
            bucket = "calcofi-files-public", exclude = c(".DS_Store"))
# A tibble: 1 × 4
  file                action      size reason  
  <chr>               <chr>      <dbl> <chr>   
1 erdCalCOFIcufes.csv uploaded 9131252 new file

4 Build Sample + Measurement Tables

Code
d <- d_raw |>
  mutate(sample_id = row_number(), .before = 1) |>
  mutate(
    datetime_start_utc = as_datetime(time),
    datetime_end_utc   = suppressWarnings(as_datetime(stop_time)),
    across(c(latitude, longitude, stop_latitude, stop_longitude,
             start_temperature, start_salinity, start_wind_speed, start_wind_direction, start_pump_speed,
             stop_temperature, stop_salinity, stop_wind_speed, stop_wind_direction, stop_pump_speed),
           ~ suppressWarnings(as.numeric(.x))))

d_sample <- d |>
  transmute(
    sample_id, cruise_orig = cruise, ship_name = ship, ship_code,
    sample_number = suppressWarnings(as.integer(sample_number)),
    datetime_start_utc, datetime_end_utc, latitude, longitude,
    latitude_stop = stop_latitude, longitude_stop = stop_longitude,
    start_temperature, start_salinity, start_wind_speed, start_wind_direction, start_pump_speed,
    stop_temperature, stop_salinity, stop_wind_speed, stop_wind_direction, stop_pump_speed)
dbWriteTable(con, "cufes_sample", d_sample, overwrite = TRUE)

# pivot egg counts -> long measurement
egg_cols <- c("sardine_eggs","anchovy_eggs","jack_mackerel_eggs","hake_eggs","squid_eggs","other_fish_eggs")
d_meas <- d |>
  select(sample_id, all_of(egg_cols)) |>
  pivot_longer(all_of(egg_cols), names_to = "measurement_type", values_to = "measurement_value") |>
  filter(!is.na(measurement_value)) |>
  mutate(measurement_value = as.double(measurement_value), measurement_qual = NA_character_) |>
  mutate(cufes_measurement_id = row_number(), .before = 1)
dbWriteTable(con, "cufes_measurement", d_meas, overwrite = TRUE)
cat(glue("cufes_sample {nrow(d_sample)}, cufes_measurement {nrow(d_meas)}"), "\n")
cufes_sample 49572, cufes_measurement 284097 

5 Resolve Keys + Spatial

Code
load_prior_tables(con, parquet_dir = here("data/parquet/swfsc_ichthyo"),
                  tables = c("ship","cruise","grid"), geom_tables = c("grid"), as_view = TRUE)
# A tibble: 3 × 3
  table   rows has_geom
  <chr>  <dbl> <lgl>   
1 cruise   691 FALSE   
2 grid     218 TRUE    
3 ship      48 FALSE   
Code
d_ship <- dbGetQuery(con, "SELECT ship_key, ship_name, ship_nodc FROM ship") |>
  mutate(ship_name_norm = ship_name |> str_to_upper() |> str_squish())
zt <- dbGetQuery(con, "SELECT sample_id, ship_name, datetime_start_utc FROM cufes_sample") |>
  mutate(ship_name_norm = ship_name |> str_replace("^R/?V\\.?\\s+","") |> str_to_upper() |> str_squish()) |>
  left_join(d_ship |> select(ship_name_norm, ship_key, ship_nodc), by = "ship_name_norm") |>
  mutate(cruise_key = if_else(is.na(ship_nodc) | is.na(datetime_start_utc), NA_character_,
                              as.character(glue("{format(datetime_start_utc,'%Y-%m')}-{ship_nodc}"))))
valid_ck <- dbGetQuery(con, "SELECT DISTINCT cruise_key FROM cruise")$cruise_key
zt <- zt |> mutate(cruise_key = if_else(cruise_key %in% valid_ck, cruise_key, NA_character_))
dbWriteTable(con, "zk", zt |> select(sample_id, ship_key, cruise_key), overwrite = TRUE)
dbExecute(con, "ALTER TABLE cufes_sample ADD COLUMN IF NOT EXISTS ship_key VARCHAR")
[1] 0
Code
dbExecute(con, "ALTER TABLE cufes_sample ADD COLUMN IF NOT EXISTS cruise_key VARCHAR")
[1] 0
Code
dbExecute(con, "UPDATE cufes_sample s SET ship_key=z.ship_key, cruise_key=z.cruise_key FROM zk z WHERE s.sample_id=z.sample_id")
[1] 49572
Code
dbExecute(con, "DROP TABLE zk")
[1] 0
Code
cat(glue("ship match {sum(!is.na(zt$ship_key))}/{nrow(zt)}; cruise_key {sum(!is.na(zt$cruise_key))}/{nrow(zt)}"), "\n")
ship match 49572/49572; cruise_key 38859/49572 
Code
add_point_geom(con, "cufes_sample", lon_col = "longitude", lat_col = "latitude")
assign_grid_key(con, "cufes_sample") |> datatable(caption = "Grid assignment")

6 Add Measurement Types

Code
cufes_types <- tibble(
  measurement_type = egg_cols,
  description = c("Sardine egg count","Northern anchovy egg count","Jack mackerel egg count",
                  "Pacific hake egg count","Squid egg count","Other fish egg count"),
  units = "count", `_source_column` = egg_cols, `_source_table` = "cufes_measurement",
  `_source_datasets` = "swfsc_cufes", `_qual_column` = NA_character_, `_prec_column` = NA_character_)
new_types <- cufes_types |> filter(!measurement_type %in% d_meas_type$measurement_type)
if (nrow(new_types) > 0) { d_meas_type <- bind_rows(d_meas_type, new_types); write_csv(d_meas_type, meas_type_csv) }
dbWriteTable(con, "measurement_type", d_meas_type, overwrite = TRUE)
cat(glue("added {nrow(new_types)} measurement types"), "\n")
added 6 measurement types 

7 Metadata, Schema, Validate, Outputs

Code
d_dataset <- ingest_yaml_to_dataset_df(read_ingest_yaml(here()))
dbWriteTable(con, "dataset", d_dataset, overwrite = TRUE)

cufes_rels <- list(
  primary_keys = list(cufes_sample = "sample_id", cufes_measurement = "cufes_measurement_id",
                      measurement_type = "measurement_type"),
  foreign_keys = list(
    list(table="cufes_measurement", column="sample_id", ref_table="cufes_sample", ref_column="sample_id"),
    list(table="cufes_measurement", column="measurement_type", ref_table="measurement_type", ref_column="measurement_type")))
cc_erd(con, tables = c("cufes_sample","cufes_measurement","measurement_type","dataset"), rels = cufes_rels,
       colors = list(lightblue = c("cufes_sample","cufes_measurement"), lightyellow = "measurement_type", white = "dataset"))

Code
build_relationships_json(rels = cufes_rels, output_dir = dir_parquet, provider = provider, dataset = dataset)
[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/swfsc_cufes/relationships.json"
Code
results <- validate_for_release(con, checks = "all", strict = FALSE)
cat("Validation:", ifelse(results$passed, "PASSED", "FAILED"), "\n")
Validation: FAILED 
Code
dir_create(dir_parquet)
write_parquet_outputs(con = con, output_dir = dir_parquet,
  tables = c("cufes_sample","cufes_measurement","measurement_type","dataset"),
  sort_by = list(cufes_measurement = "measurement_type"), strip_provenance = FALSE)
# A tibble: 4 × 5
  table               rows file_size path                            partitioned
  <chr>              <dbl>     <dbl> <chr>                           <lgl>      
1 cufes_sample       49572   3729643 /Users/bbest/Github/CalCOFI/wo… FALSE      
2 cufes_measurement 284097    524924 /Users/bbest/Github/CalCOFI/wo… FALSE      
3 measurement_type     112      5123 /Users/bbest/Github/CalCOFI/wo… FALSE      
4 dataset                8      6670 /Users/bbest/Github/CalCOFI/wo… FALSE      
Code
d_tbls_rd <- read_csv(here("metadata/swfsc/cufes/tbls_redefine.csv"))
d_flds_rd <- read_csv(here("metadata/swfsc/cufes/flds_redefine.csv"))
build_metadata_json(con = con, d_tbls_rd = d_tbls_rd, d_flds_rd = d_flds_rd,
  metadata_derived_csv = here("metadata/swfsc/cufes/metadata_derived.csv"),
  output_dir = dir_parquet, tables = c("cufes_sample","cufes_measurement"),
  set_comments = TRUE, provider = provider, dataset = dataset,
  workflow_url = cc$workflow_url, tables_owned = tables_owned)
[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/swfsc_cufes/metadata.json"
Code
sync_to_gcs(local_dir = dir_parquet, gcs_prefix = glue("ingest/{dir_label}"), bucket = "calcofi-db")
# A tibble: 7 × 4
  file                      action      size reason  
  <chr>                     <chr>      <dbl> <chr>   
1 cufes_measurement.parquet uploaded  524924 new file
2 cufes_sample.parquet      uploaded 3729643 new file
3 dataset.parquet           uploaded    6670 new file
4 manifest.json             uploaded    2255 new file
5 measurement_type.parquet  uploaded    5123 new file
6 metadata.json             uploaded    5663 new file
7 relationships.json        uploaded     557 new file

8 Questions for Data Providers

Code
read_csv(here(glue("metadata/{provider}/{dataset}/questions.csv"))) |>
  arrange(factor(priority, c("blocker","high","normal")), id) |>
  select(priority, question, context, status) |>
  datatable(caption = "Questions (ranked)", options = list(dom="t"), rownames = FALSE)
Code
close_duckdb(con); cat(glue("Parquet outputs written to: {dir_parquet}"), "\n")
Parquet outputs written to: /Users/bbest/Github/CalCOFI/workflows/data/parquet/swfsc_cufes