Publish Ichthyoplankton & Bottle Data to OBIS

Published

2026-02-24

1 Overview

This workflow converts CalCOFI ichthyoplankton (fish eggs and larvae) and bottle hydrography data to an OBIS-compliant DarwinCore Archive for publication. The archive follows the Event Core structure with Occurrence and ExtendedMeasurementOrFact (eMoF) extensions.

Data source: merged DuckDB from merge_ichthyo_bottle.qmd, combining:

  • SWFSC ichthyoplankton tables (cruise, ship, site, tow, net, ichthyo, species, lookup)
  • CalCOFI.org bottle tables (casts, bottle, bottle_measurement, cast_condition, measurement_type)

References: CalCOFI Fish Eggs & Larvae | CalCOFI Bottle Database | OBIS Manual | DarwinCore Terms

Code
devtools::load_all(here::here("../calcofi4db"))
librarian::shelf(
  CalCOFI / calcofi4db,
  DBI,
  dplyr,
  DT,
  duckdb,
  EML,
  glue,
  here,
  lubridate,
  iobis / obistools,
  purrr,
  stringr,
  readr,
  tibble,
  tidyr,
  uuid,
  xml2,
  quiet = T
)
options(readr.show_col_types = F)

# calcofi namespace for deterministic UUID v5 generation
CALCOFI_NS <- "c0f1ca00-ca1c-5000-b000-1c4790000000"

# dataset metadata ----
dataset_title <- "CalCOFI Ichthyoplankton Tows & Bottle Hydrography"
dataset_abstract <- "Fish larvae and egg counts from CalCOFI ichthyoplankton nets (primarily vertical [Calvet or Pairovet], oblique [bongo or ring nets], and surface tows [Manta nets]) combined with CTD rosette bottle hydrography (temperature, salinity, dissolved oxygen, nutrients, chlorophyll, and carbon system parameters). Oblique tows are normally standardized to count per 10 m^2 of surface sampled. Surface tows are normally standardized to count per 1,000 m^3 strained. Bottle data include depth-resolved measurements from Niskin bottles on CTD casts at CalCOFI grid stations."
dataset_keywords <- c(
  "atmosphere",
  "biology",
  "biosphere",
  "calcofi",
  "earth science",
  "environment",
  "hydrography",
  "ichthyoplankton",
  "latitude",
  "longitude",
  "nutrients",
  "ocean",
  "time"
)
country_code <- "US"
water_body <- "California Current"

dataset_id <- glue(
  "calcofi.io_workflow_ichthyo-bottle_to_obis_{format(Sys.Date(), '%Y-%m-%d')}"
)

sampling_methods_description <- "Standard CalCOFI ichthyoplankton tows: The standard oblique tow uses a bongo net (71 cm diameter, 0.505 mm mesh) or 1-m ring net (prior to 1978) retrieved at a constant wire angle (45 degrees) from 210 m depth to surface. Surface tows use a Manta net; vertical tows use CalVET/Pairovet nets. Flowmeters measure volume filtered. Samples are preserved in 5% formalin. Fish eggs and larvae are sorted, identified to lowest taxon possible, enumerated, and measured. CTD bottle data: Seawater samples collected using Niskin bottles mounted on a CTD rosette cast to near-bottom depths. Temperature, salinity, and dissolved oxygen are measured by CTD sensors and calibrated against bottle samples. Nutrient concentrations (phosphate, silicate, nitrite, nitrate, ammonia) determined by autoanalyzer. Chlorophyll-a and phaeopigment measured fluorometrically."

study_extent_description <- "The study covers the California Current ecosystem, primarily off Southern California (standard lines 77-93) but historically extending from the border of Canada to the tip of Baja California. The time series for this dataset generally begins in 1949 for bottle data and 1951 for ichthyoplankton, continuing to the present with quarterly cruises."

sampling_description <- "Samples are collected at fixed stations along the CalCOFI grid. Oblique tows are standardized to counts per 10 m^2 of sea surface. Surface tows are standardized to counts per 1,000 m^3. Bottle hydrography data provide depth-resolved oceanographic context at the same grid stations."

funding_information <- "CalCOFI is a partnership between the NOAA National Marine Fisheries Service (NMFS), Scripps Institution of Oceanography (SIO), and California Department of Fish and Wildlife (CDFW)."

rights <- "The data may be used and redistributed for free but is not intended for legal use, since it may contain inaccuracies. Neither the data Contributor, ERD, NOAA, nor the United States Government, nor any of their employees or contractors, makes any warranty, express or implied, including warranties of merchantability and fitness for a particular purpose, or assumes any legal liability for the accuracy, completeness, or usefulness, of this information."

license <- "To the extent possible under law, the publisher has waived all rights to these data and has dedicated them to the Public Domain (CC0 1.0). Users may copy, modify, distribute and use the work, including for commercial purposes, without restriction."

license_xml <- c(
  "Public Domain (CC0 1.0)" = '<ulink url="http://creativecommons.org/publicdomain/zero/1.0/legalcode"><citetitle>Public Domain (CC0 1.0)</citetitle></ulink>'
)

intellectual_rights <- list(
  para = paste(rights, license, collapse = " ")
)

dir_out <- here("data/darwincore/ichthyo_bottle")

# get merged duckdb connection ----
db_path <- here("data/wrangling/merge_ichthyo_bottle.duckdb")
stopifnot(file.exists(db_path))
con <- dbConnect(duckdb(), dbdir = db_path, read_only = TRUE)

2 Event Hierarchy Design

The archive uses a sibling-event architecture: ichthyoplankton net tows and CTD bottle casts are both children of cruise events, linked spatially via locationID (CalCOFI line_station format) rather than nested within each other. This avoids falsely implying that bottle measurements were co-located with specific net tows — they happen at the same station but at different times and depths.

Code
graph TD
    A["Cruise Event<br/>(eventID = cruise_key)"] -->|parentEventID| B["Net Sample Event<br/>(eventID = net_uuid)"]
    A -->|parentEventID| C["CTD Cast Event<br/>(eventID = cast_uuid)"]
    B -->|eventID| D[Occurrence Records<br/>ichthyo tally]
    B -->|eventID| E["eMoF: Net Sample<br/>(std_haul_factor, prop_sorted,<br/>plankton biomass)"]
    D -->|occurrenceID| F["eMoF: Stage Abundance<br/>& Body Length"]
    C -->|eventID| G["eMoF: Bottle Measurements<br/>(temperature, salinity,<br/>O2, nutrients, ...)"]
    C -->|eventID| H["eMoF: Cast Conditions<br/>(wave, wind, weather, ...)"]

    B -.-|locationID| C

    A:::cruise
    B:::net
    C:::cast
    D:::occurrence
    E:::measurement
    F:::measurement
    G:::measurement
    H:::measurement

    classDef cruise fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
    classDef net fill:#e1ffe1,stroke:#00cc66,stroke-width:2px
    classDef cast fill:#fff4e1,stroke:#ff9900,stroke-width:2px
    classDef occurrence fill:#f0e1ff,stroke:#6600cc,stroke-width:2px
    classDef measurement fill:#ffe1e1,stroke:#cc0000,stroke-width:2px

    subgraph Event Core
        A
        B
        C
    end

    subgraph Extensions
        D
        E
        F
        G
        H
    end
graph TD
    A["Cruise Event<br/>(eventID = cruise_key)"] -->|parentEventID| B["Net Sample Event<br/>(eventID = net_uuid)"]
    A -->|parentEventID| C["CTD Cast Event<br/>(eventID = cast_uuid)"]
    B -->|eventID| D[Occurrence Records<br/>ichthyo tally]
    B -->|eventID| E["eMoF: Net Sample<br/>(std_haul_factor, prop_sorted,<br/>plankton biomass)"]
    D -->|occurrenceID| F["eMoF: Stage Abundance<br/>& Body Length"]
    C -->|eventID| G["eMoF: Bottle Measurements<br/>(temperature, salinity,<br/>O2, nutrients, ...)"]
    C -->|eventID| H["eMoF: Cast Conditions<br/>(wave, wind, weather, ...)"]

    B -.-|locationID| C

    A:::cruise
    B:::net
    C:::cast
    D:::occurrence
    E:::measurement
    F:::measurement
    G:::measurement
    H:::measurement

    classDef cruise fill:#e1f5ff,stroke:#0066cc,stroke-width:2px
    classDef net fill:#e1ffe1,stroke:#00cc66,stroke-width:2px
    classDef cast fill:#fff4e1,stroke:#ff9900,stroke-width:2px
    classDef occurrence fill:#f0e1ff,stroke:#6600cc,stroke-width:2px
    classDef measurement fill:#ffe1e1,stroke:#cc0000,stroke-width:2px

    subgraph Event Core
        A
        B
        C
    end

    subgraph Extensions
        D
        E
        F
        G
        H
    end
Figure 1: Event hierarchy for CalCOFI ichthyoplankton & bottle data in DarwinCore Archive format.

Key design decisions:

  1. Collapsed hierarchy: cruise → net_sample / ctd_cast (no intermediate site/tow levels) to avoid event inheritance issues with NA values in the IPT
  2. locationID: both net_sample and ctd_cast events share locationID = "{line}_{station}" for spatial joining by OBIS consumers
  3. cast_uuid: deterministic UUID v5 minted from cast_id using the CalCOFI namespace — same approach as ichthyo_uuid
  4. Occurrence consolidation: one occurrence per life stage (egg or larva) per species per net sample
  5. eMoF structure: five sub-tables covering net-level measurements, stage abundance, body length, bottle measurements, and cast conditions

3 Data Extraction

Load all necessary tables from the merged DuckDB:

Code
# ichthyo chain ----
tbl_cruise <- tbl(con, "cruise")
tbl_ship <- tbl(con, "ship")
tbl_site <- tbl(con, "site")
tbl_tow <- tbl(con, "tow")
tbl_net <- tbl(con, "net")
tbl_ichthyo <- tbl(con, "ichthyo")
tbl_species <- tbl(con, "species")
tbl_lookup <- tbl(con, "lookup")

# bottle chain ----
tbl_casts <- tbl(con, "casts")
tbl_bottle <- tbl(con, "bottle")
tbl_bottle_meas <- tbl(con, "bottle_measurement")
tbl_cast_condition <- tbl(con, "cast_condition")
tbl_measurement_type <- tbl(con, "measurement_type")

# taxonomy ----
tbl_taxon <- tbl(con, "taxon")
tbl_taxa_rank <- tbl(con, "taxa_rank")

# quick counts
cat("Table row counts:\n")
Table row counts:
Code
for (tname in c(
  "cruise",
  "ship",
  "site",
  "tow",
  "net",
  "ichthyo",
  "species",
  "lookup",
  "casts",
  "bottle",
  "bottle_measurement",
  "cast_condition",
  "measurement_type",
  "taxon",
  "taxa_rank"
)) {
  n <- tbl(con, tname) |> summarize(n = n()) |> pull(n)
  cat(glue("  {tname}: {prettyNum(n, big.mark = ',')}"), "\n")
}
  cruise: 691 
  ship: 48 
  site: 61,104 
  tow: 75,506 
  net: 76,512 
  ichthyo: 830,873 
  species: 1,144 
  lookup: 26 
  casts: 35,644 
  bottle: 895,371 
  bottle_measurement: 11,135,600 
  cast_condition: 235,513 
  measurement_type: 47 
  taxon: 1,671 
  taxa_rank: 41 

4 Mint Globally Unique Identifiers

The merged DuckDB uses integer *_id columns for joins and SWFSC _source_uuid for provenance. For OBIS, we need globally unique, stable identifiers:

  • Net eventID: use net’s _source_uuid (original SWFSC UUID, unique per net)
  • Ichthyo occurrenceID / measurementID: mint deterministic UUID v5 from composite key
  • Cast eventID: mint deterministic UUID v5 from cast_id
  • Bottle/cast eMoF measurementID: mint deterministic UUID v5 from integer IDs
Code
# collect net table with source UUIDs for eventID ----
d_net <- tbl_net |> collect() |> rename(net_uuid = `_source_uuid`)
stopifnot(all(!duplicated(d_net$net_uuid)))
cat(glue("Net UUIDs: {nrow(d_net)} (all unique)"), "\n")
Net UUIDs: 76512 (all unique) 
Code
# collect and mint ichthyo UUIDs ----
d_ichthyo <- tbl_ichthyo |>
  collect() |>
  # join net_uuid for composite key
  left_join(d_net |> select(net_id, net_uuid), by = "net_id") |>
  mutate(
    ichthyo_uuid = UUIDfromName(
      CALCOFI_NS,
      paste0(
        net_uuid,
        "|",
        species_id,
        "|",
        life_stage,
        "|",
        ifelse(is.na(measurement_type), "", measurement_type),
        "|",
        ifelse(is.na(measurement_value), "", measurement_value)
      )
    )
  )
stopifnot(all(!duplicated(d_ichthyo$ichthyo_uuid)))
cat(glue("Ichthyo UUIDs: {nrow(d_ichthyo)} (all unique)"), "\n")
Ichthyo UUIDs: 830873 (all unique) 
Code
# collect casts and mint cast_uuid ----
d_casts <- tbl_casts |>
  collect() |>
  mutate(
    cast_uuid = UUIDfromName(
      CALCOFI_NS,
      paste0("cast:", cast_id)
    )
  )
stopifnot(all(!duplicated(d_casts$cast_uuid)))
cat(glue("Cast UUIDs: {nrow(d_casts)} (all unique)"), "\n")
Cast UUIDs: 35644 (all unique) 

5 Life Stage Vocabulary

Define mappings for egg and larva developmental stages to descriptive terms:

Code
# P06 measurement unit vocabulary (NERC) ----
measurement_unit_vocab <- tibble(
  measurementUnit = c(
    "individuals",
    "millimeters",
    "dimensionless",
    "cubic meters",
    "grams",
    "proportion",
    "m^3",
    "degC",
    "PSS-78",
    "ml/L",
    "umol/kg",
    "%",
    "kg/m3",
    "ug/L",
    "umol/L",
    "mgC/m3/hld",
    "degrees",
    "feet",
    "seconds",
    "knots",
    "millibars",
    "oktas",
    "meters",
    "Forel-Ule",
    "pH",
    "10^-8 m3/kg",
    "dyn m"
  ),
  measurementUnitID = c(
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # individuals
    "http://vocab.nerc.ac.uk/collection/P06/current/UXMM/", # millimeters
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # dimensionless
    "http://vocab.nerc.ac.uk/collection/P06/current/MCUB/", # cubic meters
    "http://vocab.nerc.ac.uk/collection/P06/current/UGRM/", # grams
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # proportion
    "http://vocab.nerc.ac.uk/collection/P06/current/MCUB/", # m^3
    "http://vocab.nerc.ac.uk/collection/P06/current/UPAA/", # degC
    "http://vocab.nerc.ac.uk/collection/P06/current/UGKG/", # PSS-78
    "http://vocab.nerc.ac.uk/collection/P06/current/UMLL/", # ml/L
    "http://vocab.nerc.ac.uk/collection/P06/current/UMKG/", # umol/kg
    "http://vocab.nerc.ac.uk/collection/P06/current/UPCT/", # %
    "http://vocab.nerc.ac.uk/collection/P06/current/UKGM/", # kg/m3
    "http://vocab.nerc.ac.uk/collection/P06/current/UGPL/", # ug/L
    "http://vocab.nerc.ac.uk/collection/P06/current/UPOX/", # umol/L
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # mgC/m3/hld (no std P06)
    "http://vocab.nerc.ac.uk/collection/P06/current/UAAA/", # degrees
    "http://vocab.nerc.ac.uk/collection/P06/current/UXFT/", # feet
    "http://vocab.nerc.ac.uk/collection/P06/current/UTBB/", # seconds
    "http://vocab.nerc.ac.uk/collection/P06/current/UKNT/", # knots
    "http://vocab.nerc.ac.uk/collection/P06/current/UPML/", # millibars
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # oktas (no std P06)
    "http://vocab.nerc.ac.uk/collection/P06/current/ULAA/", # meters
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # Forel-Ule (no std P06)
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # pH
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # 10^-8 m3/kg
    "http://vocab.nerc.ac.uk/collection/P06/current/UUUU/"
  )
) # dyn m

# NERC P01 vocabulary for bottle measurement types ----
bottle_p01_vocab <- tribble(
  ~measurement_type   , ~measurementTypeID                                         ,
  "temperature"       , "http://vocab.nerc.ac.uk/collection/P01/current/TEMPPR01/" ,
  "salinity"          , "http://vocab.nerc.ac.uk/collection/P01/current/PSALST01/" ,
  "oxygen_ml_l"       , "http://vocab.nerc.ac.uk/collection/P01/current/DOXYZZXX/" ,
  "oxygen_umol_kg"    , "http://vocab.nerc.ac.uk/collection/P01/current/DLOXYUMK/" ,
  "oxygen_saturation" , "http://vocab.nerc.ac.uk/collection/P01/current/OXYSSC01/" ,
  "sigma_theta"       , "http://vocab.nerc.ac.uk/collection/P01/current/SIGTEQ01/" ,
  "chlorophyll_a"     , "http://vocab.nerc.ac.uk/collection/P01/current/CPHLPR01/" ,
  "phaeopigment"      , "http://vocab.nerc.ac.uk/collection/P01/current/PHAEFLUO/" ,
  "phosphate"         , "http://vocab.nerc.ac.uk/collection/P01/current/PHOSZZXX/" ,
  "silicate"          , "http://vocab.nerc.ac.uk/collection/P01/current/SLCAZZXX/" ,
  "nitrite"           , "http://vocab.nerc.ac.uk/collection/P01/current/NTRIZZXX/" ,
  "nitrate"           , "http://vocab.nerc.ac.uk/collection/P01/current/NTRAZZXX/" ,
  "ammonia"           , "http://vocab.nerc.ac.uk/collection/P01/current/AMOAZZXX/" ,
  "c14_rep1"          , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
  "c14_rep2"          , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
  "c14_dark"          , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
  "c14_mean"          , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
  "light_pct"         , "http://vocab.nerc.ac.uk/collection/P01/current/IRRDSV01/" ,
  "dic_rep1"          , "http://vocab.nerc.ac.uk/collection/P01/current/DICATSWT/" ,
  "dic_rep2"          , "http://vocab.nerc.ac.uk/collection/P01/current/DICATSWT/" ,
  "alkalinity_rep1"   , "http://vocab.nerc.ac.uk/collection/P01/current/MDMAP014/" ,
  "alkalinity_rep2"   , "http://vocab.nerc.ac.uk/collection/P01/current/MDMAP014/" ,
  "ph_rep1"           , "http://vocab.nerc.ac.uk/collection/P01/current/PHXXZZXX/" ,
  "ph_rep2"           , "http://vocab.nerc.ac.uk/collection/P01/current/PHXXZZXX/" ,
  "r_depth"           , "http://vocab.nerc.ac.uk/collection/P01/current/ADEPZZ01/" ,
  "r_temperature"     , "http://vocab.nerc.ac.uk/collection/P01/current/TEMPPR01/" ,
  "r_salinity_sva"    , "http://vocab.nerc.ac.uk/collection/P01/current/SVELXXXX/" ,
  "r_dynamic_height"  , "http://vocab.nerc.ac.uk/collection/P01/current/DYNHTTAU/" ,
  "r_ammonium"        , "http://vocab.nerc.ac.uk/collection/P01/current/AMOAZZXX/" ,
  "r_oxygen_umol_kg"  , "http://vocab.nerc.ac.uk/collection/P01/current/DLOXYUMK/"
)

# NERC P01 vocabulary for cast condition types ----
cast_p01_vocab <- tribble(
  ~condition_type       , ~measurementTypeID                                         ,
  "wave_direction"      , "http://vocab.nerc.ac.uk/collection/P01/current/GWDRZZ01/" ,
  "wave_height"         , "http://vocab.nerc.ac.uk/collection/P01/current/GAVHZZ01/" ,
  "wave_period"         , "http://vocab.nerc.ac.uk/collection/P01/current/GAVTZZ01/" ,
  "wind_direction"      , "http://vocab.nerc.ac.uk/collection/P01/current/EWDAZZ01/" ,
  "wind_speed"          , "http://vocab.nerc.ac.uk/collection/P01/current/EWSBZZ01/" ,
  "barometric_pressure" , "http://vocab.nerc.ac.uk/collection/P01/current/CAPAZZ01/" ,
  "dry_air_temp"        , "http://vocab.nerc.ac.uk/collection/P01/current/CTMPZZ01/" ,
  "wet_air_temp"        , "http://vocab.nerc.ac.uk/collection/P01/current/CWETZZ01/" ,
  "weather_code"        , "http://vocab.nerc.ac.uk/collection/P01/current/WMOCWWXX/" ,
  "cloud_type"          , "http://vocab.nerc.ac.uk/collection/P01/current/WMOCLTXX/" ,
  "cloud_amount"        , "http://vocab.nerc.ac.uk/collection/P01/current/CLAMAMTS/" ,
  "visibility"          , "http://vocab.nerc.ac.uk/collection/P01/current/WMOVISXX/" ,
  "secchi_depth"        , "http://vocab.nerc.ac.uk/collection/P01/current/SECCSDNX/" ,
  "water_color"         , "http://vocab.nerc.ac.uk/collection/P01/current/CCOLXXXX/"
)

6 Build Event Hierarchy

6.1 Cruise Events

Aggregate over both net tows and casts to build cruise-level events:

Code
# collect joined net→tow→site for cruise summaries ----
d_net_site <- d_net |>
  left_join(
    tbl_tow |> collect() |> select(tow_id, site_id, time_start),
    by = "tow_id"
  ) |>
  left_join(
    tbl_site |>
      collect() |>
      select(site_id, cruise_key, longitude, latitude, line, station),
    by = "site_id"
  )

# net cruise summaries ----
d_net_cruise_summary <- d_net_site |>
  group_by(cruise_key) |>
  summarize(
    net_time_min = min(time_start, na.rm = TRUE),
    net_time_max = max(time_start, na.rm = TRUE),
    net_lon_min = min(longitude, na.rm = TRUE),
    net_lon_max = max(longitude, na.rm = TRUE),
    net_lat_min = min(latitude, na.rm = TRUE),
    net_lat_max = max(latitude, na.rm = TRUE),
    .groups = "drop"
  )

# cast cruise summaries ----
d_cast_cruise_summary <- d_casts |>
  group_by(cruise_key) |>
  summarize(
    cast_time_min = min(datetime_utc, na.rm = TRUE),
    cast_time_max = max(datetime_utc, na.rm = TRUE),
    cast_lon_min = min(lon_dec, na.rm = TRUE),
    cast_lon_max = max(lon_dec, na.rm = TRUE),
    cast_lat_min = min(lat_dec, na.rm = TRUE),
    cast_lat_max = max(lat_dec, na.rm = TRUE),
    .groups = "drop"
  )

# combine cruise metadata ----
d_cruise_events <- tbl_cruise |>
  left_join(tbl_ship, by = "ship_key") |>
  collect() |>
  left_join(d_net_cruise_summary, by = "cruise_key") |>
  left_join(d_cast_cruise_summary, by = "cruise_key") |>
  mutate(
    # date range: union of net tows and casts
    time_min = pmin(net_time_min, cast_time_min, na.rm = TRUE),
    time_max = pmax(net_time_max, cast_time_max, na.rm = TRUE),
    # bounding box: union of net sites and cast positions
    lon_min = pmin(net_lon_min, cast_lon_min, na.rm = TRUE),
    lon_max = pmax(net_lon_max, cast_lon_max, na.rm = TRUE),
    lat_min = pmin(net_lat_min, cast_lat_min, na.rm = TRUE),
    lat_max = pmax(net_lat_max, cast_lat_max, na.rm = TRUE)
  ) |>
  filter(!is.na(time_min) | !is.na(cast_time_min)) |>
  mutate(
    eventID = cruise_key,
    parentEventID = NA_character_,
    eventType = "cruise",
    eventDate = case_when(
      !is.na(time_min) & !is.na(time_max) ~
        paste0(
          format(time_min, "%Y-%m-%dT%H:%M:%SZ"),
          "/",
          format(time_max, "%Y-%m-%dT%H:%M:%SZ")
        ),
      TRUE ~ NA_character_
    ),
    footprintWKT = case_when(
      !is.na(lon_min) ~
        glue(
          "POLYGON(({lon_min} {lat_min}, {lon_max} {lat_min}, {lon_max} {lat_max}, {lon_min} {lat_max}, {lon_min} {lat_min}))"
        ),
      TRUE ~ NA_character_
    ),
    samplingProtocol = "Marine plankton survey cruise",
    habitat = "marine",
    eventRemarks = glue(
      "Cruise on {ship_name} from {as.Date(time_min)} to {as.Date(time_max)}"
    )
  ) |>
  select(
    eventID,
    parentEventID,
    eventType,
    eventDate,
    footprintWKT,
    samplingProtocol,
    habitat,
    eventRemarks
  )

cat(glue("Cruise events: {nrow(d_cruise_events)}"), "\n")
Cruise events: 687 

6.2 Net Sample Events

Collapsed net_sample events (cruise → net_sample) with coordinates, date, and sampling info:

Code
d_net_events <- d_net_site |>
  mutate(
    eventID = net_uuid,
    parentEventID = cruise_key,
    eventType = "net_sample",
    eventDate = format(time_start, "%Y-%m-%dT%H:%M:%SZ"),
    decimalLatitude = latitude,
    decimalLongitude = longitude,
    locationID = paste0(line, "_", station),
    samplingProtocol = "CalCOFI ichthyoplankton net tow",
    sampleSizeValue = vol_sampled_m3,
    sampleSizeUnit = "m^3",
    eventRemarks = glue(
      "Net: {side} side; ",
      "{prop_sorted} proportion sorted of {vol_sampled_m3} m3 sampled"
    )
  )

# keep full data for eMoF
d_net_events_full <- d_net_events

d_net_events <- d_net_events |>
  select(
    eventID,
    parentEventID,
    eventType,
    eventDate,
    decimalLatitude,
    decimalLongitude,
    locationID,
    samplingProtocol,
    sampleSizeValue,
    sampleSizeUnit,
    eventRemarks
  )

stopifnot(all(!duplicated(d_net_events$eventID)))
cat(glue("Net sample events: {nrow(d_net_events)}"), "\n")
Net sample events: 76512 

6.3 CTD Cast Events

One event per cast:

Code
d_cast_events <- d_casts |>
  mutate(
    eventID = cast_uuid,
    parentEventID = cruise_key,
    eventType = "ctd_cast",
    eventDate = format(datetime_utc, "%Y-%m-%dT%H:%M:%SZ"),
    decimalLatitude = lat_dec,
    decimalLongitude = lon_dec,
    locationID = paste0(rpt_line, "_", rpt_sta),
    samplingProtocol = "CTD rosette cast",
    sampleSizeValue = bottom_depth_m,
    sampleSizeUnit = "m",
    eventRemarks = glue(
      "CTD cast at station {rpt_line}_{rpt_sta}, bottom depth {bottom_depth_m} m"
    )
  ) |>
  select(
    eventID,
    parentEventID,
    eventType,
    eventDate,
    decimalLatitude,
    decimalLongitude,
    locationID,
    samplingProtocol,
    sampleSizeValue,
    sampleSizeUnit,
    eventRemarks
  )

stopifnot(all(!duplicated(d_cast_events$eventID)))
cat(glue("CTD cast events: {nrow(d_cast_events)}"), "\n")
CTD cast events: 35644 

6.4 Combine All Events

Code
d_event <- bind_rows(
  d_cruise_events,
  d_net_events,
  d_cast_events
) |>
  mutate(
    geodeticDatum = "WGS84",
    coordinateUncertaintyInMeters = 1000,
    countryCode = country_code,
    waterBody = water_body,
    datasetID = dataset_id
  )

cat("Event counts by type:\n")
Event counts by type:
Code
d_event |> count(eventType) |> print()
# A tibble: 3 × 2
  eventType      n
  <chr>      <int>
1 cruise       687
2 ctd_cast   35644
3 net_sample 76512
Code
# verify all child events have valid parentEventID
cruise_ids <- d_event |> filter(eventType == "cruise") |> pull(eventID)
child_events <- d_event |> filter(eventType != "cruise")
orphan_children <- child_events |> filter(!parentEventID %in% cruise_ids)
if (nrow(orphan_children) > 0) {
  warning(glue(
    "{nrow(orphan_children)} child events have no matching cruise parent"
  ))
} else {
  cat("All child events have valid cruise parents.\n")
}

7 Build Occurrence Records

From the consolidated ichthyo table — one occurrence per life stage per species per net sample (tally rows where measurement_type is NULL):

Code
d_occurrence <- d_ichthyo |>
  filter(is.na(measurement_type)) |>
  left_join(
    tbl_species |> collect(),
    by = "species_id"
  ) |>
  mutate(
    occurrenceID = ichthyo_uuid,
    eventID = net_uuid,
    scientificName = scientific_name,
    scientificNameID = ifelse(
      !is.na(worms_id) & worms_id > 0,
      paste0("urn:lsid:marinespecies.org:taxname:", worms_id),
      NA_character_
    ),
    kingdom = "Animalia",
    occurrenceStatus = ifelse(tally > 0, "present", "absent"),
    organismQuantity = tally,
    organismQuantityType = "individuals",
    lifeStage = life_stage,
    preparations = paste0(life_stage, " sample"),
    basisOfRecord = "HumanObservation",
    modified = Sys.Date()
  )

# verify all occurrences link to valid net_sample events
net_ids <- d_net_events |> pull(eventID)
orphan_occs <- d_occurrence |> filter(!eventID %in% net_ids)
if (nrow(orphan_occs) > 0) {
  warning(glue(
    "{nrow(orphan_occs)} occurrences have no matching net_sample event"
  ))
} else {
  cat("All occurrences link to valid net_sample events.\n")
}
All occurrences link to valid net_sample events.
Code
d_occurrence_out <- d_occurrence |>
  select(
    occurrenceID,
    eventID,
    scientificName,
    scientificNameID,
    kingdom,
    occurrenceStatus,
    organismQuantity,
    organismQuantityType,
    lifeStage,
    preparations,
    basisOfRecord,
    modified
  )

cat(glue("Occurrences: {nrow(d_occurrence_out)}"), "\n")
Occurrences: 463655 
Code
cat(glue("Unique species: {n_distinct(d_occurrence_out$scientificName)}"), "\n")
Unique species: 899 

8 Build ExtendedMeasurementOrFact Extension

8.1 10a. Net-level sample measurements

Code
d_emof_net <- d_net_events_full |>
  select(
    eventID,
    net_uuid,
    std_haul_factor,
    prop_sorted,
    smallplankton,
    totalplankton
  ) |>
  pivot_longer(
    cols = c(std_haul_factor, prop_sorted, smallplankton, totalplankton),
    names_to = "meas_type",
    values_to = "measurementValue"
  ) |>
  filter(!is.na(measurementValue)) |>
  mutate(
    measurementID = UUIDfromName(
      CALCOFI_NS,
      paste0("net:", net_uuid, ":", meas_type)
    ),
    occurrenceID = NA_character_,
    measurementType = case_when(
      meas_type == "std_haul_factor" ~ "standardized haul factor",
      meas_type == "prop_sorted" ~ "proportion of sample sorted",
      meas_type == "smallplankton" ~ "small plankton biomass",
      meas_type == "totalplankton" ~ "total plankton biomass"
    ),
    measurementTypeID = NA_character_,
    measurementUnit = case_when(
      meas_type == "std_haul_factor" ~ "dimensionless",
      meas_type == "prop_sorted" ~ "dimensionless",
      meas_type %in% c("smallplankton", "totalplankton") ~ "grams"
    ),
    measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
    measurementRemarks = case_when(
      meas_type == "std_haul_factor" ~
        "Standardization factor for water filtered; abundances per 10 m^2 (Smith 1977)",
      meas_type == "prop_sorted" ~ "Fraction of total sample examined",
      TRUE ~ NA_character_
    )
  ) |>
  left_join(measurement_unit_vocab, by = "measurementUnit") |>
  select(
    eventID,
    occurrenceID,
    measurementID,
    measurementType,
    measurementTypeID,
    measurementValue,
    measurementUnit,
    measurementUnitID,
    measurementMethod,
    measurementRemarks
  )

cat(glue("eMoF net-level: {nrow(d_emof_net)}"), "\n")
eMoF net-level: 243598 

8.2 10b. Stage-specific abundance

From ichthyo rows where measurement_type = 'stage', linked to the matching tally row’s ichthyo_uuid as occurrenceID:

Code
# build lookup: tally occurrenceIDs with their eventID (net_uuid)
d_occ_lookup <- d_occurrence |>
  select(
    net_id,
    species_id,
    life_stage,
    occurrenceID = ichthyo_uuid,
    occ_eventID = eventID
  )

# collect lookup descriptions for stages
d_lookup <- tbl_lookup |>
  filter(lookup_type %in% c("egg_stage", "larva_stage")) |>
  collect()

# stage records — join to lookup via derived lookup_type
d_emof_stage <- d_ichthyo |>
  filter(measurement_type == "stage") |>
  mutate(
    lookup_type = paste0(life_stage, "_stage")
  ) |>
  left_join(
    d_lookup |> select(lookup_type, lookup_num, description),
    by = c("lookup_type", "measurement_value" = "lookup_num")
  ) |>
  left_join(d_occ_lookup, by = c("net_id", "species_id", "life_stage")) |>
  mutate(
    eventID = occ_eventID,
    measurementID = ichthyo_uuid,
    measurementType = "abundance by life stage",
    measurementTypeID = NA_character_,
    meas_val = tally,
    measurementValueID = NA_character_,
    measurementUnit = "individuals",
    measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
    measurementRemarks = description
  ) |>
  left_join(measurement_unit_vocab, by = "measurementUnit") |>
  select(
    eventID,
    occurrenceID,
    measurementID,
    measurementType,
    measurementTypeID,
    measurementValue = meas_val,
    measurementValueID,
    measurementUnit,
    measurementUnitID,
    measurementMethod,
    measurementRemarks
  )

cat(glue("eMoF stage abundance: {nrow(d_emof_stage)}"), "\n")
eMoF stage abundance: 125347 

8.3 10c. Body length measurements

From ichthyo rows where measurement_type = 'size':

Code
d_emof_length <- d_ichthyo |>
  filter(measurement_type == "size") |>
  left_join(d_occ_lookup, by = c("net_id", "species_id", "life_stage")) |>
  filter(!is.na(measurement_value)) |>
  mutate(
    eventID = occ_eventID,
    measurementID = ichthyo_uuid,
    measurementType = "body length",
    measurementTypeID = NA_character_,
    meas_val = measurement_value,
    measurementValueID = NA_character_,
    measurementUnit = "millimeters",
    measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
    measurementRemarks = paste0("Count: ", tally, "; Total length measurement")
  ) |>
  left_join(measurement_unit_vocab, by = "measurementUnit") |>
  select(
    eventID,
    occurrenceID,
    measurementID,
    measurementType,
    measurementTypeID,
    measurementValue = meas_val,
    measurementValueID,
    measurementUnit,
    measurementUnitID,
    measurementMethod,
    measurementRemarks
  )

cat(glue("eMoF body length: {nrow(d_emof_length)}"), "\n")
eMoF body length: 241871 

8.4 10d. Bottle measurements

From bottle_measurement joined through bottlecasts:

Code
# build cast_id → cast_uuid lookup
d_cast_uuid_lookup <- d_casts |> select(cast_id, cast_uuid)

# get measurement type descriptions and units
d_mtype <- tbl_measurement_type |>
  filter(dataset == "bottle") |>
  collect()

d_emof_bottle <- tbl_bottle_meas |>
  left_join(
    tbl_bottle |> select(bottle_id, cast_id, depth_m),
    by = "bottle_id"
  ) |>
  collect() |>
  inner_join(d_cast_uuid_lookup, by = "cast_id") |>
  left_join(
    d_mtype |> select(measurement_type, description, units),
    by = "measurement_type"
  ) |>
  left_join(bottle_p01_vocab, by = "measurement_type") |>
  mutate(
    eventID = cast_uuid,
    occurrenceID = NA_character_,
    measurementID = UUIDfromName(
      CALCOFI_NS,
      paste0("bm:", bottle_measurement_id)
    ),
    measurementType = description,
    meas_val = measurement_value,
    measurementUnit = units,
    measurementMethod = "https://calcofi.org/data/oceanographic-data/bottle-database/",
    measurementRemarks = paste0("depth_m: ", depth_m)
  ) |>
  left_join(measurement_unit_vocab, by = "measurementUnit") |>
  select(
    eventID,
    occurrenceID,
    measurementID,
    measurementType,
    measurementTypeID,
    measurementValue = meas_val,
    measurementUnit,
    measurementUnitID,
    measurementMethod,
    measurementRemarks
  )

cat(glue("eMoF bottle measurements: {nrow(d_emof_bottle)}"), "\n")
eMoF bottle measurements: 11135600 

8.5 10e. Cast conditions

From cast_condition joined to casts:

Code
d_emof_cast_cond <- tbl_cast_condition |>
  collect() |>
  inner_join(d_cast_uuid_lookup, by = "cast_id") |>
  left_join(
    tbl_measurement_type |>
      filter(dataset == "cast") |>
      collect() |>
      select(measurement_type, description, units),
    by = c("condition_type" = "measurement_type")
  ) |>
  left_join(cast_p01_vocab, by = "condition_type") |>
  mutate(
    eventID = cast_uuid,
    occurrenceID = NA_character_,
    measurementID = UUIDfromName(
      CALCOFI_NS,
      paste0("cc:", cast_condition_id)
    ),
    measurementType = description,
    measurementValue = condition_value,
    measurementUnit = ifelse(is.na(units) | units == "", NA_character_, units),
    measurementMethod = "https://calcofi.org/data/oceanographic-data/bottle-database/",
    measurementRemarks = NA_character_
  ) |>
  left_join(measurement_unit_vocab, by = "measurementUnit") |>
  select(
    eventID,
    occurrenceID,
    measurementID,
    measurementType,
    measurementTypeID,
    measurementValue,
    measurementUnit,
    measurementUnitID,
    measurementMethod,
    measurementRemarks
  )

cat(glue("eMoF cast conditions: {nrow(d_emof_cast_cond)}"), "\n")
eMoF cast conditions: 235513 

8.6 Combine All eMoF Records

Code
d_emof <- bind_rows(
  d_emof_net,
  d_emof_stage |> select(-measurementValueID),
  d_emof_length |> select(-measurementValueID),
  d_emof_bottle,
  d_emof_cast_cond
)

cat("eMoF counts by source:\n")
eMoF counts by source:
Code
cat(glue("  Net-level:       {nrow(d_emof_net)}"), "\n")
  Net-level:       243598 
Code
cat(glue("  Stage abundance: {nrow(d_emof_stage)}"), "\n")
  Stage abundance: 125347 
Code
cat(glue("  Body length:     {nrow(d_emof_length)}"), "\n")
  Body length:     241871 
Code
cat(glue("  Bottle meas:     {nrow(d_emof_bottle)}"), "\n")
  Bottle meas:     11135600 
Code
cat(glue("  Cast conditions: {nrow(d_emof_cast_cond)}"), "\n")
  Cast conditions: 235513 
Code
cat(glue("  Total:           {nrow(d_emof)}"), "\n")
  Total:           11981929 
Code
# verify all eMoF records link to a valid eventID or occurrenceID
event_ids <- d_event$eventID
occ_ids <- d_occurrence_out$occurrenceID

emof_with_event <- d_emof |> filter(!is.na(eventID))
emof_with_occ <- d_emof |> filter(!is.na(occurrenceID))
emof_orphan_event <- emof_with_event |> filter(!eventID %in% event_ids)
emof_orphan_occ <- emof_with_occ |> filter(!occurrenceID %in% occ_ids)

if (nrow(emof_orphan_event) > 0) {
  warning(glue("{nrow(emof_orphan_event)} eMoF records have invalid eventID"))
}
if (nrow(emof_orphan_occ) > 0) {
  warning(glue(
    "{nrow(emof_orphan_occ)} eMoF records have invalid occurrenceID"
  ))
}

9 Write DarwinCore Archive Files

Export the event core and extension files as CSV:

Code
dir.create(dir_out, recursive = TRUE, showWarnings = FALSE)

write_csv(d_event, file.path(dir_out, "event.csv"), na = "")
write_csv(d_occurrence_out, file.path(dir_out, "occurrence.csv"), na = "")
write_csv(d_emof, file.path(dir_out, "extendedMeasurementOrFact.csv"), na = "")

cat("Wrote CSV files to:", dir_out, "\n")
Wrote CSV files to: /Users/bbest/Github/CalCOFI/workflows/data/darwincore/ichthyo_bottle 

10 Create meta.xml

Generate meta.xml by mapping CSV column names to DarwinCore term URIs:

Code
# DwC term mappings ----
dwc_terms <- list(
  event = list(
    rowType = "http://rs.tdwg.org/dwc/terms/Event",
    idField = "eventID",
    terms = c(
      eventID = "http://rs.tdwg.org/dwc/terms/eventID",
      parentEventID = "http://rs.tdwg.org/dwc/terms/parentEventID",
      eventType = "http://rs.tdwg.org/dwc/terms/eventType",
      eventDate = "http://rs.tdwg.org/dwc/terms/eventDate",
      samplingProtocol = "http://rs.tdwg.org/dwc/terms/samplingProtocol",
      sampleSizeValue = "http://rs.tdwg.org/dwc/terms/sampleSizeValue",
      sampleSizeUnit = "http://rs.tdwg.org/dwc/terms/sampleSizeUnit",
      eventRemarks = "http://rs.tdwg.org/dwc/terms/eventRemarks",
      habitat = "http://rs.tdwg.org/dwc/terms/habitat",
      footprintWKT = "http://rs.tdwg.org/dwc/terms/footprintWKT",
      decimalLatitude = "http://rs.tdwg.org/dwc/terms/decimalLatitude",
      decimalLongitude = "http://rs.tdwg.org/dwc/terms/decimalLongitude",
      geodeticDatum = "http://rs.tdwg.org/dwc/terms/geodeticDatum",
      coordinateUncertaintyInMeters = "http://rs.tdwg.org/dwc/terms/coordinateUncertaintyInMeters",
      locationID = "http://rs.tdwg.org/dwc/terms/locationID",
      countryCode = "http://rs.tdwg.org/dwc/terms/countryCode",
      waterBody = "http://rs.tdwg.org/dwc/terms/waterBody",
      datasetID = "http://rs.tdwg.org/dwc/terms/datasetID"
    )
  ),
  occurrence = list(
    rowType = "http://rs.tdwg.org/dwc/terms/Occurrence",
    idField = "occurrenceID",
    coreIdField = "eventID",
    terms = c(
      occurrenceID = "http://rs.tdwg.org/dwc/terms/occurrenceID",
      eventID = "http://rs.tdwg.org/dwc/terms/eventID",
      scientificName = "http://rs.tdwg.org/dwc/terms/scientificName",
      scientificNameID = "http://rs.tdwg.org/dwc/terms/scientificNameID",
      kingdom = "http://rs.tdwg.org/dwc/terms/kingdom",
      occurrenceStatus = "http://rs.tdwg.org/dwc/terms/occurrenceStatus",
      organismQuantity = "http://rs.tdwg.org/dwc/terms/organismQuantity",
      organismQuantityType = "http://rs.tdwg.org/dwc/terms/organismQuantityType",
      lifeStage = "http://rs.tdwg.org/dwc/terms/lifeStage",
      preparations = "http://rs.tdwg.org/dwc/terms/preparations",
      basisOfRecord = "http://rs.tdwg.org/dwc/terms/basisOfRecord",
      modified = "http://purl.org/dc/terms/modified"
    )
  ),
  extendedmeasurementorfact = list(
    rowType = "http://rs.iobis.org/obis/terms/ExtendedMeasurementOrFact",
    idField = "measurementID",
    coreIdField = "eventID",
    terms = c(
      eventID = "http://rs.tdwg.org/dwc/terms/eventID",
      occurrenceID = "http://rs.tdwg.org/dwc/terms/occurrenceID",
      measurementID = "http://rs.tdwg.org/dwc/terms/measurementID",
      measurementType = "http://rs.tdwg.org/dwc/terms/measurementType",
      measurementTypeID = "http://rs.iobis.org/obis/terms/measurementTypeID",
      measurementValue = "http://rs.tdwg.org/dwc/terms/measurementValue",
      measurementUnit = "http://rs.tdwg.org/dwc/terms/measurementUnit",
      measurementUnitID = "http://rs.iobis.org/obis/terms/measurementUnitID",
      measurementMethod = "http://rs.tdwg.org/dwc/terms/measurementMethod",
      measurementRemarks = "http://rs.tdwg.org/dwc/terms/measurementRemarks"
    )
  )
)

# helper functions ----
create_field_elements <- function(csv_file, term_map, coreid_field = NULL) {
  col_names <- names(read_csv(csv_file, n_max = 0, show_col_types = FALSE))
  field_elements <- map(seq_along(col_names), function(i) {
    col <- col_names[i]
    if (!is.null(coreid_field) && col == coreid_field) {
      return(NULL)
    }
    term <- term_map$terms[[col]]
    if (!is.null(term)) {
      glue('    <field index="{i-1}" term="{term}"/>')
    } else {
      message(
        "Warning: Column '",
        col,
        "' in ",
        csv_file,
        " has no DwC mapping."
      )
      NULL
    }
  })
  paste(compact(field_elements), collapse = "\n")
}

get_coreid_index <- function(csv_file, coreid_field) {
  col_names <- names(read_csv(csv_file, n_max = 0, show_col_types = FALSE))
  if (length(coreid_field) > 1) {
    coreid_field <- intersect(coreid_field, col_names)[1]
  }
  which(col_names == coreid_field) - 1
}

# generate meta.xml ----
generate_meta_xml <- function(dir_out) {
  event_file <- file.path(dir_out, "event.csv")
  event_fields <- create_field_elements(event_file, dwc_terms$event)
  event_id_idx <- which(
    names(read_csv(event_file, n_max = 0, show_col_types = FALSE)) ==
      dwc_terms$event$idField
  ) -
    1

  occ_file <- file.path(dir_out, "occurrence.csv")
  occ_fields <- create_field_elements(
    occ_file,
    dwc_terms$occurrence,
    coreid_field = dwc_terms$occurrence$coreIdField
  )
  occ_coreid_idx <- get_coreid_index(occ_file, dwc_terms$occurrence$coreIdField)

  emof_file <- file.path(dir_out, "extendedMeasurementOrFact.csv")
  emof_fields <- create_field_elements(
    emof_file,
    dwc_terms$extendedmeasurementorfact,
    coreid_field = dwc_terms$extendedmeasurementorfact$coreIdField
  )
  emof_coreid_idx <- get_coreid_index(
    emof_file,
    dwc_terms$extendedmeasurementorfact$coreIdField
  )

  glue(
    '<?xml version="1.0" encoding="UTF-8"?>
<archive xmlns="http://rs.tdwg.org/dwc/text/"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://rs.tdwg.org/dwc/text/ http://rs.tdwg.org/dwc/text/tdwg_dwc_text.xsd">
  <core encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
        fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$event$rowType}">
    <files>
      <location>event.csv</location>
    </files>
    <id index="{event_id_idx}" />
{event_fields}
  </core>
  <extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
             fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$occurrence$rowType}">
    <files>
      <location>occurrence.csv</location>
    </files>
    <coreid index="{occ_coreid_idx}" />
{occ_fields}
  </extension>
  <extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
             fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$extendedmeasurementorfact$rowType}">
    <files>
      <location>extendedMeasurementOrFact.csv</location>
    </files>
    <coreid index="{emof_coreid_idx}" />
{emof_fields}
  </extension>
</archive>'
  )
}

meta_xml <- generate_meta_xml(dir_out)
writeLines(meta_xml, file.path(dir_out, "meta.xml"))
cat("Generated meta.xml\n")
Generated meta.xml
Code
cat(meta_xml)
<?xml version="1.0" encoding="UTF-8"?>
<archive xmlns="http://rs.tdwg.org/dwc/text/"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://rs.tdwg.org/dwc/text/ http://rs.tdwg.org/dwc/text/tdwg_dwc_text.xsd">
  <core encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\n"
        fieldsEnclosedBy='"' ignoreHeaderLines="1" rowType="http://rs.tdwg.org/dwc/terms/Event">
    <files>
      <location>event.csv</location>
    </files>
    <id index="0" />
    <field index="0" term="http://rs.tdwg.org/dwc/terms/eventID"/>
    <field index="1" term="http://rs.tdwg.org/dwc/terms/parentEventID"/>
    <field index="2" term="http://rs.tdwg.org/dwc/terms/eventType"/>
    <field index="3" term="http://rs.tdwg.org/dwc/terms/eventDate"/>
    <field index="4" term="http://rs.tdwg.org/dwc/terms/footprintWKT"/>
    <field index="5" term="http://rs.tdwg.org/dwc/terms/samplingProtocol"/>
    <field index="6" term="http://rs.tdwg.org/dwc/terms/habitat"/>
    <field index="7" term="http://rs.tdwg.org/dwc/terms/eventRemarks"/>
    <field index="8" term="http://rs.tdwg.org/dwc/terms/decimalLatitude"/>
    <field index="9" term="http://rs.tdwg.org/dwc/terms/decimalLongitude"/>
    <field index="10" term="http://rs.tdwg.org/dwc/terms/locationID"/>
    <field index="11" term="http://rs.tdwg.org/dwc/terms/sampleSizeValue"/>
    <field index="12" term="http://rs.tdwg.org/dwc/terms/sampleSizeUnit"/>
    <field index="13" term="http://rs.tdwg.org/dwc/terms/geodeticDatum"/>
    <field index="14" term="http://rs.tdwg.org/dwc/terms/coordinateUncertaintyInMeters"/>
    <field index="15" term="http://rs.tdwg.org/dwc/terms/countryCode"/>
    <field index="16" term="http://rs.tdwg.org/dwc/terms/waterBody"/>
    <field index="17" term="http://rs.tdwg.org/dwc/terms/datasetID"/>
  </core>
  <extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\n"
             fieldsEnclosedBy='"' ignoreHeaderLines="1" rowType="http://rs.tdwg.org/dwc/terms/Occurrence">
    <files>
      <location>occurrence.csv</location>
    </files>
    <coreid index="1" />
    <field index="0" term="http://rs.tdwg.org/dwc/terms/occurrenceID"/>
    <field index="2" term="http://rs.tdwg.org/dwc/terms/scientificName"/>
    <field index="3" term="http://rs.tdwg.org/dwc/terms/scientificNameID"/>
    <field index="4" term="http://rs.tdwg.org/dwc/terms/kingdom"/>
    <field index="5" term="http://rs.tdwg.org/dwc/terms/occurrenceStatus"/>
    <field index="6" term="http://rs.tdwg.org/dwc/terms/organismQuantity"/>
    <field index="7" term="http://rs.tdwg.org/dwc/terms/organismQuantityType"/>
    <field index="8" term="http://rs.tdwg.org/dwc/terms/lifeStage"/>
    <field index="9" term="http://rs.tdwg.org/dwc/terms/preparations"/>
    <field index="10" term="http://rs.tdwg.org/dwc/terms/basisOfRecord"/>
    <field index="11" term="http://purl.org/dc/terms/modified"/>
  </extension>
  <extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\n"
             fieldsEnclosedBy='"' ignoreHeaderLines="1" rowType="http://rs.iobis.org/obis/terms/ExtendedMeasurementOrFact">
    <files>
      <location>extendedMeasurementOrFact.csv</location>
    </files>
    <coreid index="0" />
    <field index="1" term="http://rs.tdwg.org/dwc/terms/occurrenceID"/>
    <field index="2" term="http://rs.tdwg.org/dwc/terms/measurementID"/>
    <field index="3" term="http://rs.tdwg.org/dwc/terms/measurementType"/>
    <field index="4" term="http://rs.iobis.org/obis/terms/measurementTypeID"/>
    <field index="5" term="http://rs.tdwg.org/dwc/terms/measurementValue"/>
    <field index="6" term="http://rs.tdwg.org/dwc/terms/measurementUnit"/>
    <field index="7" term="http://rs.iobis.org/obis/terms/measurementUnitID"/>
    <field index="8" term="http://rs.tdwg.org/dwc/terms/measurementMethod"/>
    <field index="9" term="http://rs.tdwg.org/dwc/terms/measurementRemarks"/>
  </extension>
</archive>

11 Create EML Metadata

Build EML document with geographic, temporal, and taxonomic coverage:

Code
# geographic coverage: union of net sites and cast positions ----
geo_net <- tbl_site |>
  summarise(
    lon_min = min(longitude, na.rm = TRUE),
    lon_max = max(longitude, na.rm = TRUE),
    lat_min = min(latitude, na.rm = TRUE),
    lat_max = max(latitude, na.rm = TRUE)
  ) |>
  collect()

geo_cast <- d_casts |>
  summarise(
    lon_min = min(lon_dec, na.rm = TRUE),
    lon_max = max(lon_dec, na.rm = TRUE),
    lat_min = min(lat_dec, na.rm = TRUE),
    lat_max = max(lat_dec, na.rm = TRUE)
  )

geo_coverage <- tibble(
  westBoundingCoordinate = min(geo_net$lon_min, geo_cast$lon_min, na.rm = TRUE),
  eastBoundingCoordinate = max(geo_net$lon_max, geo_cast$lon_max, na.rm = TRUE),
  southBoundingCoordinate = min(
    geo_net$lat_min,
    geo_cast$lat_min,
    na.rm = TRUE
  ),
  northBoundingCoordinate = max(geo_net$lat_max, geo_cast$lat_max, na.rm = TRUE)
)

# temporal coverage: union of tow and cast dates ----
temp_net <- tbl_tow |>
  summarise(
    tmin = min(time_start, na.rm = TRUE),
    tmax = max(time_start, na.rm = TRUE)
  ) |>
  collect()

temp_cast <- d_casts |>
  summarise(
    tmin = min(datetime_utc, na.rm = TRUE),
    tmax = max(datetime_utc, na.rm = TRUE)
  )

temp_coverage <- list(
  beginDate = as.character(as.Date(min(
    temp_net$tmin,
    temp_cast$tmin,
    na.rm = TRUE
  ))),
  endDate = as.character(as.Date(max(
    temp_net$tmax,
    temp_cast$tmax,
    na.rm = TRUE
  )))
)

# taxonomic coverage ----
d_taxa <- tbl_taxon |> collect()

taxa_coverage <- tbl_species |>
  mutate(
    worms_id = case_when(
      is.na(worms_id) | worms_id == 0 ~ NA_integer_,
      TRUE ~ worms_id
    )
  ) |>
  collect() |>
  filter(!is.na(worms_id)) |>
  distinct(species_id, worms_id, common_name) |>
  left_join(
    d_taxa |> distinct(taxonID, taxonRank, scientificName),
    by = join_by(worms_id == taxonID)
  ) |>
  group_by(worms_id, taxonRank, scientificName) |>
  summarize(
    species_id = first(species_id),
    common_name = paste(unique(na.omit(common_name)), collapse = ", "),
    .groups = "drop"
  ) |>
  arrange(species_id) |>
  filter(!is.na(scientificName)) |>
  pmap(function(
    species_id,
    taxonRank,
    scientificName,
    common_name,
    worms_id,
    ...
  ) {
    item <- list(
      id = glue("calcofi_species_{species_id}"),
      taxonRankName = taxonRank,
      taxonRankValue = scientificName,
      taxonId = list(
        provider = "https://www.marinespecies.org",
        as.character(worms_id)
      )
    )
    if (!is.na(common_name) && common_name != "") {
      item$commonName <- common_name
    }
    item
  })

# build EML document ----
my_eml <- list(
  packageId = dataset_id,
  system = "calcofi.io",
  dataset = list(
    title = dataset_title,
    intellectualRights = intellectual_rights,
    creator = list(
      individualName = list(givenName = "Ed", surName = "Weber"),
      organizationName = "NOAA SWFSC",
      electronicMailAddress = "ed.weber@noaa.gov",
      userId = list(
        directory = "https://orcid.org/",
        userId = "0000-0002-0942-434X"
      )
    ),
    abstract = dataset_abstract,
    keywordSet = list(
      keyword = dataset_keywords,
      keywordThesaurus = "GCMD Science Keywords"
    ),
    coverage = list(
      geographicCoverage = list(
        geographicDescription = "California Current Large Marine Ecoregion",
        boundingCoordinates = geo_coverage
      ),
      temporalCoverage = list(
        rangeOfDates = list(
          beginDate = list(calendarDate = temp_coverage$beginDate),
          endDate = list(calendarDate = temp_coverage$endDate)
        )
      ),
      taxonomicCoverage = list(
        generalTaxonomicCoverage = "Marine ichthyoplankton (fish eggs and larvae)",
        taxonomicClassification = taxa_coverage
      )
    ),
    contact = list(
      individualName = list(givenName = "Ed", surName = "Weber"),
      electronicMailAddress = "ed.weber@noaa.gov"
    ),
    methods = list(
      methodStep = list(
        description = list(para = sampling_methods_description)
      ),
      sampling = list(
        studyExtent = list(
          description = list(para = study_extent_description)
        ),
        samplingDescription = list(
          para = sampling_description
        )
      )
    ),
    project = list(
      title = dataset_title,
      personnel = list(
        individualName = list(givenName = "Ed", surName = "Weber"),
        role = "Data Manager"
      ),
      funding = list(para = funding_information)
    )
  )
)

eml_xml <- file.path(dir_out, "eml.xml")
write_eml(my_eml, eml_xml)

# replace license placeholder with actual license xml
readLines(eml_xml) |>
  str_replace(fixed(names(license_xml)), license_xml) |>
  writeLines(eml_xml)

validation_result <- eml_validate(eml_xml)
if (!validation_result) {
  cat("EML validation errors:\n")
  print(attr(validation_result, "errors"))
  stop("EML validation failed")
} else {
  cat("EML is valid!\n")
}
EML is valid!

12 Data Quality Checks

Validate event hierarchy, orphan records, and output summary statistics:

Code
# check for missing WoRMS IDs
d_missing_worms <- tbl_species |>
  filter(is.na(worms_id) | worms_id == 0) |>
  select(species_id, scientific_name) |>
  collect()

if (nrow(d_missing_worms) > 0) {
  cat("WARNING:", nrow(d_missing_worms), "species missing WoRMS IDs:\n")
  print(d_missing_worms, n = 20)
}

# summary statistics ----
n_cruises <- d_event |> filter(eventType == "cruise") |> nrow()
n_nets <- d_event |> filter(eventType == "net_sample") |> nrow()
n_casts_out <- d_event |> filter(eventType == "ctd_cast") |> nrow()
n_occs <- nrow(d_occurrence_out)
n_species <- n_distinct(d_occurrence_out$scientificName)
n_emof <- nrow(d_emof)

glue(
  "
=== Dataset Summary ===
Total events:         {prettyNum(nrow(d_event), big.mark = ',')}
  - Cruises:          {prettyNum(n_cruises, big.mark = ',')}
  - Net samples:      {prettyNum(n_nets, big.mark = ',')}
  - CTD casts:        {prettyNum(n_casts_out, big.mark = ',')}
Total occurrences:    {prettyNum(n_occs, big.mark = ',')}
Unique species:       {prettyNum(n_species, big.mark = ',')}
Total eMoF records:   {prettyNum(n_emof, big.mark = ',')}"
)
=== Dataset Summary ===
Total events:         112,843
  - Cruises:          687
  - Net samples:      76,512
  - CTD casts:        35,644
Total occurrences:    463,655
Unique species:       899
Total eMoF records:   11,981,929

13 Package DarwinCore Archive

Create the final DwC-A zip file:

Code
zip_file <- file.path(
  dirname(dir_out),
  paste0("ichthyo_bottle_", Sys.Date(), ".zip")
)

zip(
  zip_file,
  files = file.path(
    dir_out,
    c(
      "event.csv",
      "occurrence.csv",
      "extendedMeasurementOrFact.csv",
      "meta.xml",
      "eml.xml"
    )
  ),
  flags = "-j"
)

cat("\nDarwin Core Archive created:", zip_file, "\n")

Darwin Core Archive created: /Users/bbest/Github/CalCOFI/workflows/data/darwincore/ichthyo_bottle_2026-02-24.zip 

14 Sync to Google Cloud Storage

Code
gcs_bucket <- "calcofi-db"
gcs_publish_prefix <- "publish/ichthyo_bottle"

sync_results <- sync_to_gcs(
  local_dir = dir_out,
  gcs_prefix = gcs_publish_prefix,
  bucket = gcs_bucket
)

15 Validate with obistools

Code
# read back the CSVs
d_event_check <- read_csv(file.path(dir_out, "event.csv"))
d_occ_check <- read_csv(file.path(dir_out, "occurrence.csv"))
d_emof_check <- read_csv(file.path(dir_out, "extendedMeasurementOrFact.csv"))

# event hierarchy check
cat("=== Event hierarchy check ===\n")
=== Event hierarchy check ===
Code
event_check <- check_eventids(d_event_check)
if (nrow(event_check) == 0) {
  cat("PASS: all event parent-child relationships valid\n")
} else {
  cat("ISSUES found:\n")
  print(event_check)
}
ISSUES found:
# A tibble: 8,393 × 4
   field         level   row message                                          
   <chr>         <chr> <int> <chr>                                            
 1 parentEventID error 77200 parentEventID 4903CR has no corresponding eventID
 2 parentEventID error 77201 parentEventID 4903CR has no corresponding eventID
 3 parentEventID error 77202 parentEventID 4903CR has no corresponding eventID
 4 parentEventID error 77203 parentEventID 4903CR has no corresponding eventID
 5 parentEventID error 77204 parentEventID 4903CR has no corresponding eventID
 6 parentEventID error 77205 parentEventID 4903CR has no corresponding eventID
 7 parentEventID error 77206 parentEventID 4903CR has no corresponding eventID
 8 parentEventID error 77207 parentEventID 4903CR has no corresponding eventID
 9 parentEventID error 77208 parentEventID 4903CR has no corresponding eventID
10 parentEventID error 77209 parentEventID 4903CR has no corresponding eventID
# ℹ 8,383 more rows
Code
# occurrence → event linkage
cat("\n=== Occurrence-event linkage ===\n")

=== Occurrence-event linkage ===
Code
occ_check <- check_extension_eventids(d_event_check, d_occ_check)
if (nrow(occ_check) == 0) {
  cat("PASS: all occurrences link to valid events\n")
} else {
  cat("ISSUES found:\n")
  print(occ_check)
}
PASS: all occurrences link to valid events
Code
# eMoF → event linkage
cat("\n=== eMoF-event linkage ===\n")

=== eMoF-event linkage ===
Code
emof_event_check <- d_emof_check |>
  filter(!is.na(eventID)) |>
  filter(!eventID %in% d_event_check$eventID)
if (nrow(emof_event_check) == 0) {
  cat("PASS: all eMoF eventIDs link to valid events\n")
} else {
  cat(nrow(emof_event_check), "eMoF records with invalid eventID\n")
}
PASS: all eMoF eventIDs link to valid events
Code
# eMoF → occurrence linkage
emof_occ_check <- d_emof_check |>
  filter(!is.na(occurrenceID)) |>
  filter(!occurrenceID %in% d_occ_check$occurrenceID)
if (nrow(emof_occ_check) == 0) {
  cat("PASS: all eMoF occurrenceIDs link to valid occurrences\n")
} else {
  cat(nrow(emof_occ_check), "eMoF records with invalid occurrenceID\n")
}
PASS: all eMoF occurrenceIDs link to valid occurrences
Code
# summary by event type
cat("\n=== Events by type ===\n")

=== Events by type ===
Code
d_event_check |> count(eventType) |> print()
# A tibble: 3 × 2
  eventType      n
  <chr>      <int>
1 cruise       687
2 ctd_cast   35644
3 net_sample 76512
Code
cat("\n=== eMoF by measurementType ===\n")

=== eMoF by measurementType ===
Code
d_emof_check |> count(measurementType, sort = TRUE) |> print(n = 50)
# A tibble: 45 × 2
   measurementType                                                             n
   <chr>                                                                   <int>
 1 Dissolved oxygen (QC'd)                                                1.42e6
 2 Reported depth from pressure (pre-QC)                                  8.95e5
 3 Water temperature (QC'd)                                               8.84e5
 4 Reported potential temperature (pre-QC)                                8.49e5
 5 Reported dynamic height (no QC'd counterpart)                          8.49e5
 6 Salinity - Practical Salinity Scale 1978 (QC'd)                        8.48e5
 7 Potential density (Sigma Theta)                                        8.43e5
 8 Reported Specific Volume Anomaly (pre-QC); WARNING: different paramet… 8.43e5
 9 Oxygen percent saturation                                              6.91e5
10 Reported oxygen (pre-QC)                                               6.91e5
11 Phosphate concentration                                                4.41e5
12 Silicate concentration                                                 3.82e5
13 Nitrate concentration                                                  3.65e5
14 Nitrite concentration                                                  3.64e5
15 Chlorophyll-a measured fluorometrically                                2.46e5
16 Phaeopigment measured fluorometrically                                 2.46e5
17 body length                                                            2.42e5
18 abundance by life stage                                                1.25e5
19 Reported ammonium concentration (pre-QC)                               9.05e4
20 Ammonia concentration (QC'd)                                           9.05e4
21 proportion of sample sorted                                            7.65e4
22 standardized haul factor                                               7.65e4
23 small plankton biomass                                                 4.53e4
24 total plankton biomass                                                 4.53e4
25 Wind direction (abbreviated 360 degree azimuth)                        3.42e4
26 Wind speed                                                             3.40e4
27 WMO weather code (WMO 4501)                                            2.75e4
28 Mean 14C assimilation of replicates 1 and 2                            2.38e4
29 14C assimilation dark/control bottle                                   2.38e4
30 Dry air temperature from sling psychrometer                            2.03e4
31 Barometric pressure                                                    2.03e4
32 Wet air temperature from sling psychrometer                            2.02e4
33 Light intensity of incubation tubes                                    1.98e4
34 14C assimilation replicate 1                                           1.55e4
35 14C assimilation replicate 2                                           1.55e4
36 Wave direction (abbreviated 360 degree azimuth)                        1.32e4
37 Wave height                                                            1.27e4
38 Cloud amount in oktas (WMO 2700)                                       1.25e4
39 Wave period                                                            1.23e4
40 WMO cloud type code (WMO 0500)                                         1.05e4
41 WMO visibility code (WMO 4300)                                         1.01e4
42 Secchi disk depth                                                      5.69e3
43 Total alkalinity replicate 1                                           2.08e3
44 Water color on Forel-Ule scale                                         2.08e3
45 Dissolved inorganic carbon replicate 1                                 2.00e3

16 Cleanup

Code
dbDisconnect(con, shutdown = TRUE)
cat("Database connection closed.\n")
Database connection closed.