Ingest CalCOFI Lobster Phyllosoma

Published

2026-06-08

1 Overview

Source: EDI knb-lter-cce.188.4 — spiny lobster (Panulirus interruptus) phyllosoma larvae counts by stage, CalCOFI net tows 1951-2008 (PI Koslow). Provider calcofi.

2 Setup

Code
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
librarian::shelf(
  CalCOFI/calcofi4db, CalCOFI/calcofi4r,
  DBI, dplyr, DT, fs, glue, here, janitor, jsonlite, knitr, lubridate, purrr,
  readr, sf, stringr, tibble, tidyr, units, quiet = T)
options(readr.show_col_types = F); options(DT.options = list(scrollX = TRUE))
source(here("libs/ingest.R"))
cc <- read_calcofi_meta(here("ingest_calcofi_phyllosoma.qmd"))
provider <- cc$provider; dataset <- cc$dataset; tables_owned <- cc$tables_owned
dir_label <- glue("{provider}_{dataset}")
dir_parquet <- here(glue("data/parquet/{dir_label}")); db_path <- here(glue("data/wrangling/{dir_label}.duckdb"))
if (overwrite && file_exists(db_path)) file_delete(db_path)
dir_create(dirname(db_path)); con <- get_duckdb_con(db_path); load_duckdb_extension(con, "spatial")
meas_type_csv <- here("metadata/measurement_type.csv"); d_meas_type <- read_csv(meas_type_csv)

3 Download from EDI

Code
dir_dl <- here(glue("data/cache/{dir_label}")); dir_create(dir_dl)
phyllo_csv <- file.path(dir_dl, "phyllosoma.csv")
edi_url <- "https://pasta.lternet.edu/package/data/eml/knb-lter-cce/188/4/2e66465fa17f78cdc15a2bfc9dce652d"
if (overwrite || !file_exists(phyllo_csv)) download.file(edi_url, phyllo_csv, quiet = TRUE)
d_raw <- read_csv(phyllo_csv, col_types = cols(.default = "c")) |> clean_names()
cat(glue("Downloaded {nrow(d_raw)} phyllosoma tows"), "\n")
Downloaded 1859 phyllosoma tows 
Code
sync_to_gcs(local_dir = dir_dl, gcs_prefix = glue("archive/{provider}/{dataset}"),
            bucket = "calcofi-files-public", exclude = c(".DS_Store"))
# A tibble: 1 × 4
  file           action    size reason        
  <chr>          <chr>    <dbl> <chr>         
1 phyllosoma.csv skipped 196700 checksum match

4 Build Tow + Measurement Tables

Code
d <- d_raw |>
  mutate(tow_id = row_number(), .before = 1) |>
  mutate(
    date      = suppressWarnings(as.Date(tow_collection_day)),
    latitude  = suppressWarnings(as.numeric(latitude_o)),
    longitude = suppressWarnings(as.numeric(longitude_o)),
    line      = suppressWarnings(as.numeric(station_line)),
    station   = suppressWarnings(as.numeric(station_number)),
    site_key  = if_else(is.na(line)|is.na(station), NA_character_, sprintf("%05.1f %05.1f", line, station)),
    datetime_start_utc = as_datetime(date))

d_tow <- d |>
  transmute(
    tow_id, cruise_orig = year_month_of_tow, ship_name = ship, sorting_lab,
    date, datetime_start_utc, line, station, site_key, latitude, longitude,
    max_tow_depth_m = suppressWarnings(as.numeric(max_tow_depth_m)),
    volume_filtered = suppressWarnings(as.numeric(volume_water_filtered_ml_1000m3)),
    aliquot_pct = suppressWarnings(as.numeric(aliquot_percent)),
    aliquot_adjustment = aliquot_adjustment_value, study_flag)
dbWriteTable(con, "phyllosoma_tow", d_tow, overwrite = TRUE)

stage_cols <- c("total_phyllosoma", paste0("stage_", 1:11))
new_names  <- c("total_phyllosoma", paste0("phyllosoma_stage_", 1:11))
d_meas <- d |>
  select(tow_id, all_of(stage_cols)) |>
  rename_with(~ new_names, all_of(stage_cols)) |>
  pivot_longer(all_of(new_names), names_to = "measurement_type", values_to = "measurement_value") |>
  mutate(measurement_value = suppressWarnings(as.double(measurement_value)),
         measurement_qual = NA_character_) |>
  filter(!is.na(measurement_value)) |>
  mutate(phyllosoma_measurement_id = row_number(), .before = 1)
dbWriteTable(con, "phyllosoma_measurement", d_meas, overwrite = TRUE)
cat(glue("phyllosoma_tow {nrow(d_tow)}, phyllosoma_measurement {nrow(d_meas)}"), "\n")
phyllosoma_tow 1859, phyllosoma_measurement 22308 

5 Resolve Keys + Spatial

Code
load_prior_tables(con, parquet_dir = here("data/parquet/swfsc_ichthyo"),
                  tables = c("ship","cruise","grid"), geom_tables = c("grid"), as_view = TRUE)
# A tibble: 3 × 3
  table   rows has_geom
  <chr>  <dbl> <lgl>   
1 cruise   691 FALSE   
2 grid     218 TRUE    
3 ship      48 FALSE   
Code
# source "Ship" is a NODC ship code (e.g. 31PT), not a name -> join on ship_nodc
d_ship <- dbGetQuery(con, "SELECT ship_key, ship_nodc FROM ship")
zt <- dbGetQuery(con, "SELECT tow_id, ship_name AS ship_nodc, date FROM phyllosoma_tow") |>
  left_join(d_ship, by = "ship_nodc") |>
  mutate(cruise_key = if_else(is.na(ship_nodc)|is.na(date), NA_character_,
                              as.character(glue("{format(date,'%Y-%m')}-{ship_nodc}"))))
valid_ck <- dbGetQuery(con, "SELECT DISTINCT cruise_key FROM cruise")$cruise_key
zt <- zt |> mutate(cruise_key = if_else(cruise_key %in% valid_ck, cruise_key, NA_character_))
dbWriteTable(con, "zk", zt |> select(tow_id, ship_key, cruise_key), overwrite = TRUE)
dbExecute(con, "ALTER TABLE phyllosoma_tow ADD COLUMN IF NOT EXISTS ship_key VARCHAR")
[1] 0
Code
dbExecute(con, "ALTER TABLE phyllosoma_tow ADD COLUMN IF NOT EXISTS cruise_key VARCHAR")
[1] 0
Code
dbExecute(con, "UPDATE phyllosoma_tow t SET ship_key=z.ship_key, cruise_key=z.cruise_key FROM zk z WHERE t.tow_id=z.tow_id")
[1] 1859
Code
dbExecute(con, "DROP TABLE zk")
[1] 0
Code
cat(glue("ship {sum(!is.na(zt$ship_key))}/{nrow(zt)}; cruise_key {sum(!is.na(zt$cruise_key))}/{nrow(zt)}"), "\n")
ship 1816/1859; cruise_key 1634/1859 
Code
add_point_geom(con, "phyllosoma_tow", lon_col = "longitude", lat_col = "latitude")
assign_grid_key(con, "phyllosoma_tow") |> datatable(caption = "Grid assignment")

6 Measurement Types + Finalize

Code
phyllo_types <- tibble(
  measurement_type = new_names,
  description = c("Total phyllosoma larvae count", paste0("Phyllosoma stage ", 1:11, " count")),
  units = "count", `_source_column` = new_names, `_source_table` = "phyllosoma_measurement",
  `_source_datasets` = "calcofi_phyllosoma", `_qual_column` = NA_character_, `_prec_column` = NA_character_)
new_types <- phyllo_types |> filter(!measurement_type %in% d_meas_type$measurement_type)
if (nrow(new_types) > 0) { d_meas_type <- bind_rows(d_meas_type, new_types); write_csv(d_meas_type, meas_type_csv) }
dbWriteTable(con, "measurement_type", d_meas_type, overwrite = TRUE)

d_dataset <- ingest_yaml_to_dataset_df(read_ingest_yaml(here()))
dbWriteTable(con, "dataset", d_dataset, overwrite = TRUE)

phyllo_rels <- list(
  primary_keys = list(phyllosoma_tow = "tow_id", phyllosoma_measurement = "phyllosoma_measurement_id",
                      measurement_type = "measurement_type"),
  foreign_keys = list(
    list(table="phyllosoma_measurement", column="tow_id", ref_table="phyllosoma_tow", ref_column="tow_id"),
    list(table="phyllosoma_measurement", column="measurement_type", ref_table="measurement_type", ref_column="measurement_type")))
cc_erd(con, tables = c("phyllosoma_tow","phyllosoma_measurement","measurement_type","dataset"), rels = phyllo_rels,
       colors = list(lightblue = c("phyllosoma_tow","phyllosoma_measurement"), lightyellow = "measurement_type", white = "dataset"))

Code
build_relationships_json(rels = phyllo_rels, output_dir = dir_parquet, provider = provider, dataset = dataset)
[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/calcofi_phyllosoma/relationships.json"
Code
results <- validate_for_release(con, checks = "all", strict = FALSE)
cat("Validation:", ifelse(results$passed, "PASSED", "FAILED"), "\n")
Validation: FAILED 
Code
dir_create(dir_parquet)
write_parquet_outputs(con = con, output_dir = dir_parquet,
  tables = c("phyllosoma_tow","phyllosoma_measurement","measurement_type","dataset"),
  sort_by = list(phyllosoma_measurement = "measurement_type"), strip_provenance = FALSE)
# A tibble: 4 × 5
  table                   rows file_size path                        partitioned
  <chr>                  <dbl>     <dbl> <chr>                       <lgl>      
1 phyllosoma_tow          1859     34160 /Users/bbest/Github/CalCOF… FALSE      
2 phyllosoma_measurement 22308     79763 /Users/bbest/Github/CalCOF… FALSE      
3 measurement_type         124      5346 /Users/bbest/Github/CalCOF… FALSE      
4 dataset                    9      6862 /Users/bbest/Github/CalCOF… FALSE      
Code
d_tbls_rd <- read_csv(here("metadata/calcofi/phyllosoma/tbls_redefine.csv"))
d_flds_rd <- read_csv(here("metadata/calcofi/phyllosoma/flds_redefine.csv"))
build_metadata_json(con = con, d_tbls_rd = d_tbls_rd, d_flds_rd = d_flds_rd,
  metadata_derived_csv = here("metadata/calcofi/phyllosoma/metadata_derived.csv"),
  output_dir = dir_parquet, tables = c("phyllosoma_tow","phyllosoma_measurement"),
  set_comments = TRUE, provider = provider, dataset = dataset,
  workflow_url = cc$workflow_url, tables_owned = tables_owned)
[1] "/Users/bbest/Github/CalCOFI/workflows/data/parquet/calcofi_phyllosoma/metadata.json"
Code
sync_to_gcs(local_dir = dir_parquet, gcs_prefix = glue("ingest/{dir_label}"), bucket = "calcofi-db")
# A tibble: 7 × 4
  file                           action    size reason        
  <chr>                          <chr>    <dbl> <chr>         
1 dataset.parquet                skipped   6862 checksum match
2 manifest.json                  uploaded  2157 crc32c changed
3 measurement_type.parquet       skipped   5346 checksum match
4 metadata.json                  skipped   4712 checksum match
5 phyllosoma_measurement.parquet skipped  79763 checksum match
6 phyllosoma_tow.parquet         uploaded 34160 crc32c changed
7 relationships.json             skipped    579 checksum match

7 Questions for Data Providers

Code
read_csv(here(glue("metadata/{provider}/{dataset}/questions.csv"))) |>
  arrange(factor(priority, c("blocker","high","normal")), id) |>
  select(priority, question, context, status) |>
  datatable(caption = "Questions (ranked)", options = list(dom="t"), rownames = FALSE)
Code
close_duckdb(con); cat(glue("Parquet outputs written to: {dir_parquet}"), "\n")
Parquet outputs written to: /Users/bbest/Github/CalCOFI/workflows/data/parquet/calcofi_phyllosoma