---
title: "Publish Ichthyoplankton & Bottle Data to OBIS"
editor_options:
chunk_output_type: console
---
## Overview
This workflow converts CalCOFI ichthyoplankton (fish eggs and larvae) and bottle hydrography data to an OBIS-compliant DarwinCore Archive for publication. The archive follows the Event Core structure with Occurrence and ExtendedMeasurementOrFact (eMoF) extensions.
**Data source**: merged DuckDB from `merge_ichthyo_bottle.qmd`, combining:
- SWFSC ichthyoplankton tables (cruise, ship, site, tow, net, ichthyo, species, lookup)
- CalCOFI.org bottle tables (casts, bottle, bottle_measurement, cast_condition, measurement_type)
**References**: [CalCOFI Fish Eggs & Larvae](https://calcofi.org/data/marine-ecosystem-data/fish-eggs-larvae/) | [CalCOFI Bottle Database](https://calcofi.org/data/oceanographic-data/bottle-database/) | [OBIS Manual](https://manual.obis.org/) | [DarwinCore Terms](https://dwc.tdwg.org/terms/)
```{r}
#| label: setup
devtools::load_all(here::here("../calcofi4db"))
librarian::shelf(
CalCOFI / calcofi4db,
DBI,
dplyr,
DT,
duckdb,
EML,
glue,
here,
lubridate,
iobis / obistools,
purrr,
stringr,
readr,
tibble,
tidyr,
uuid,
xml2,
quiet = T
)
options(readr.show_col_types = F)
# calcofi namespace for deterministic UUID v5 generation
CALCOFI_NS <- "c0f1ca00-ca1c-5000-b000-1c4790000000"
# dataset metadata ----
dataset_title <- "CalCOFI Ichthyoplankton Tows & Bottle Hydrography"
dataset_abstract <- "Fish larvae and egg counts from CalCOFI ichthyoplankton nets (primarily vertical [Calvet or Pairovet], oblique [bongo or ring nets], and surface tows [Manta nets]) combined with CTD rosette bottle hydrography (temperature, salinity, dissolved oxygen, nutrients, chlorophyll, and carbon system parameters). Oblique tows are normally standardized to count per 10 m^2 of surface sampled. Surface tows are normally standardized to count per 1,000 m^3 strained. Bottle data include depth-resolved measurements from Niskin bottles on CTD casts at CalCOFI grid stations."
dataset_keywords <- c(
"atmosphere",
"biology",
"biosphere",
"calcofi",
"earth science",
"environment",
"hydrography",
"ichthyoplankton",
"latitude",
"longitude",
"nutrients",
"ocean",
"time"
)
country_code <- "US"
water_body <- "California Current"
dataset_id <- glue(
"calcofi.io_workflow_ichthyo-bottle_to_obis_{format(Sys.Date(), '%Y-%m-%d')}"
)
sampling_methods_description <- "Standard CalCOFI ichthyoplankton tows: The standard oblique tow uses a bongo net (71 cm diameter, 0.505 mm mesh) or 1-m ring net (prior to 1978) retrieved at a constant wire angle (45 degrees) from 210 m depth to surface. Surface tows use a Manta net; vertical tows use CalVET/Pairovet nets. Flowmeters measure volume filtered. Samples are preserved in 5% formalin. Fish eggs and larvae are sorted, identified to lowest taxon possible, enumerated, and measured. CTD bottle data: Seawater samples collected using Niskin bottles mounted on a CTD rosette cast to near-bottom depths. Temperature, salinity, and dissolved oxygen are measured by CTD sensors and calibrated against bottle samples. Nutrient concentrations (phosphate, silicate, nitrite, nitrate, ammonia) determined by autoanalyzer. Chlorophyll-a and phaeopigment measured fluorometrically."
study_extent_description <- "The study covers the California Current ecosystem, primarily off Southern California (standard lines 77-93) but historically extending from the border of Canada to the tip of Baja California. The time series for this dataset generally begins in 1949 for bottle data and 1951 for ichthyoplankton, continuing to the present with quarterly cruises."
sampling_description <- "Samples are collected at fixed stations along the CalCOFI grid. Oblique tows are standardized to counts per 10 m^2 of sea surface. Surface tows are standardized to counts per 1,000 m^3. Bottle hydrography data provide depth-resolved oceanographic context at the same grid stations."
funding_information <- "CalCOFI is a partnership between the NOAA National Marine Fisheries Service (NMFS), Scripps Institution of Oceanography (SIO), and California Department of Fish and Wildlife (CDFW)."
rights <- "The data may be used and redistributed for free but is not intended for legal use, since it may contain inaccuracies. Neither the data Contributor, ERD, NOAA, nor the United States Government, nor any of their employees or contractors, makes any warranty, express or implied, including warranties of merchantability and fitness for a particular purpose, or assumes any legal liability for the accuracy, completeness, or usefulness, of this information."
license <- "To the extent possible under law, the publisher has waived all rights to these data and has dedicated them to the Public Domain (CC0 1.0). Users may copy, modify, distribute and use the work, including for commercial purposes, without restriction."
license_xml <- c(
"Public Domain (CC0 1.0)" = '<ulink url="http://creativecommons.org/publicdomain/zero/1.0/legalcode"><citetitle>Public Domain (CC0 1.0)</citetitle></ulink>'
)
intellectual_rights <- list(
para = paste(rights, license, collapse = " ")
)
dir_out <- here("data/darwincore/ichthyo_bottle")
# get merged duckdb connection ----
db_path <- here("data/wrangling/merge_ichthyo_bottle.duckdb")
stopifnot(file.exists(db_path))
con <- dbConnect(duckdb(), dbdir = db_path, read_only = TRUE)
```
## Event Hierarchy Design
The archive uses a **sibling-event architecture**: ichthyoplankton net tows and CTD bottle casts are both children of cruise events, linked spatially via `locationID` (CalCOFI `line_station` format) rather than nested within each other. This avoids falsely implying that bottle measurements were co-located with specific net tows — they happen at the same station but at different times and depths.
```{mermaid}
%%| label: fig-ichthyo_bottle_event_hierarchy
%%| fig-cap: "Event hierarchy for CalCOFI ichthyoplankton & bottle data in DarwinCore Archive format."
%%| file: diagrams/ichthyo_bottle_event_hierarchy.mmd
```
**Key design decisions**:
1. **Collapsed hierarchy**: cruise → net_sample / ctd_cast (no intermediate site/tow levels) to avoid event inheritance issues with NA values in the IPT
2. **`locationID`**: both net_sample and ctd_cast events share `locationID = "{line}_{station}"` for spatial joining by OBIS consumers
3. **`cast_uuid`**: deterministic UUID v5 minted from `cast_id` using the CalCOFI namespace — same approach as `ichthyo_uuid`
4. **Occurrence consolidation**: one occurrence per life stage (egg or larva) per species per net sample
5. **eMoF structure**: five sub-tables covering net-level measurements, stage abundance, body length, bottle measurements, and cast conditions
## Data Extraction
Load all necessary tables from the merged DuckDB:
```{r}
#| label: extract-tables
# ichthyo chain ----
tbl_cruise <- tbl(con, "cruise")
tbl_ship <- tbl(con, "ship")
tbl_site <- tbl(con, "site")
tbl_tow <- tbl(con, "tow")
tbl_net <- tbl(con, "net")
tbl_ichthyo <- tbl(con, "ichthyo")
tbl_species <- tbl(con, "species")
tbl_lookup <- tbl(con, "lookup")
# bottle chain ----
tbl_casts <- tbl(con, "casts")
tbl_bottle <- tbl(con, "bottle")
tbl_bottle_meas <- tbl(con, "bottle_measurement")
tbl_cast_condition <- tbl(con, "cast_condition")
tbl_measurement_type <- tbl(con, "measurement_type")
# taxonomy ----
tbl_taxon <- tbl(con, "taxon")
tbl_taxa_rank <- tbl(con, "taxa_rank")
# quick counts
cat("Table row counts:\n")
for (tname in c(
"cruise",
"ship",
"site",
"tow",
"net",
"ichthyo",
"species",
"lookup",
"casts",
"bottle",
"bottle_measurement",
"cast_condition",
"measurement_type",
"taxon",
"taxa_rank"
)) {
n <- tbl(con, tname) |> summarize(n = n()) |> pull(n)
cat(glue(" {tname}: {prettyNum(n, big.mark = ',')}"), "\n")
}
```
## Mint Globally Unique Identifiers
The merged DuckDB uses integer `*_id` columns for joins and SWFSC `_source_uuid` for provenance. For OBIS, we need globally unique, stable identifiers:
- **Net eventID**: use net's `_source_uuid` (original SWFSC UUID, unique per net)
- **Ichthyo occurrenceID / measurementID**: mint deterministic UUID v5 from composite key
- **Cast eventID**: mint deterministic UUID v5 from `cast_id`
- **Bottle/cast eMoF measurementID**: mint deterministic UUID v5 from integer IDs
```{r}
#| label: mint-uuids
# collect net table with source UUIDs for eventID ----
d_net <- tbl_net |> collect() |> rename(net_uuid = `_source_uuid`)
stopifnot(all(!duplicated(d_net$net_uuid)))
cat(glue("Net UUIDs: {nrow(d_net)} (all unique)"), "\n")
# collect and mint ichthyo UUIDs ----
d_ichthyo <- tbl_ichthyo |>
collect() |>
# join net_uuid for composite key
left_join(d_net |> select(net_id, net_uuid), by = "net_id") |>
mutate(
ichthyo_uuid = UUIDfromName(
CALCOFI_NS,
paste0(
net_uuid,
"|",
species_id,
"|",
life_stage,
"|",
ifelse(is.na(measurement_type), "", measurement_type),
"|",
ifelse(is.na(measurement_value), "", measurement_value)
)
)
)
stopifnot(all(!duplicated(d_ichthyo$ichthyo_uuid)))
cat(glue("Ichthyo UUIDs: {nrow(d_ichthyo)} (all unique)"), "\n")
# collect casts and mint cast_uuid ----
d_casts <- tbl_casts |>
collect() |>
mutate(
cast_uuid = UUIDfromName(
CALCOFI_NS,
paste0("cast:", cast_id)
)
)
stopifnot(all(!duplicated(d_casts$cast_uuid)))
cat(glue("Cast UUIDs: {nrow(d_casts)} (all unique)"), "\n")
```
## Life Stage Vocabulary
Define mappings for egg and larva developmental stages to descriptive terms:
```{r}
#| label: vocabulary-lookups
# P06 measurement unit vocabulary (NERC) ----
measurement_unit_vocab <- tibble(
measurementUnit = c(
"individuals",
"millimeters",
"dimensionless",
"cubic meters",
"grams",
"proportion",
"m^3",
"degC",
"PSS-78",
"ml/L",
"umol/kg",
"%",
"kg/m3",
"ug/L",
"umol/L",
"mgC/m3/hld",
"degrees",
"feet",
"seconds",
"knots",
"millibars",
"oktas",
"meters",
"Forel-Ule",
"pH",
"10^-8 m3/kg",
"dyn m"
),
measurementUnitID = c(
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # individuals
"http://vocab.nerc.ac.uk/collection/P06/current/UXMM/", # millimeters
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # dimensionless
"http://vocab.nerc.ac.uk/collection/P06/current/MCUB/", # cubic meters
"http://vocab.nerc.ac.uk/collection/P06/current/UGRM/", # grams
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # proportion
"http://vocab.nerc.ac.uk/collection/P06/current/MCUB/", # m^3
"http://vocab.nerc.ac.uk/collection/P06/current/UPAA/", # degC
"http://vocab.nerc.ac.uk/collection/P06/current/UGKG/", # PSS-78
"http://vocab.nerc.ac.uk/collection/P06/current/UMLL/", # ml/L
"http://vocab.nerc.ac.uk/collection/P06/current/UMKG/", # umol/kg
"http://vocab.nerc.ac.uk/collection/P06/current/UPCT/", # %
"http://vocab.nerc.ac.uk/collection/P06/current/UKGM/", # kg/m3
"http://vocab.nerc.ac.uk/collection/P06/current/UGPL/", # ug/L
"http://vocab.nerc.ac.uk/collection/P06/current/UPOX/", # umol/L
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # mgC/m3/hld (no std P06)
"http://vocab.nerc.ac.uk/collection/P06/current/UAAA/", # degrees
"http://vocab.nerc.ac.uk/collection/P06/current/UXFT/", # feet
"http://vocab.nerc.ac.uk/collection/P06/current/UTBB/", # seconds
"http://vocab.nerc.ac.uk/collection/P06/current/UKNT/", # knots
"http://vocab.nerc.ac.uk/collection/P06/current/UPML/", # millibars
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # oktas (no std P06)
"http://vocab.nerc.ac.uk/collection/P06/current/ULAA/", # meters
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # Forel-Ule (no std P06)
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # pH
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/", # 10^-8 m3/kg
"http://vocab.nerc.ac.uk/collection/P06/current/UUUU/"
)
) # dyn m
# NERC P01 vocabulary for bottle measurement types ----
bottle_p01_vocab <- tribble(
~measurement_type , ~measurementTypeID ,
"temperature" , "http://vocab.nerc.ac.uk/collection/P01/current/TEMPPR01/" ,
"salinity" , "http://vocab.nerc.ac.uk/collection/P01/current/PSALST01/" ,
"oxygen_ml_l" , "http://vocab.nerc.ac.uk/collection/P01/current/DOXYZZXX/" ,
"oxygen_umol_kg" , "http://vocab.nerc.ac.uk/collection/P01/current/DLOXYUMK/" ,
"oxygen_saturation" , "http://vocab.nerc.ac.uk/collection/P01/current/OXYSSC01/" ,
"sigma_theta" , "http://vocab.nerc.ac.uk/collection/P01/current/SIGTEQ01/" ,
"chlorophyll_a" , "http://vocab.nerc.ac.uk/collection/P01/current/CPHLPR01/" ,
"phaeopigment" , "http://vocab.nerc.ac.uk/collection/P01/current/PHAEFLUO/" ,
"phosphate" , "http://vocab.nerc.ac.uk/collection/P01/current/PHOSZZXX/" ,
"silicate" , "http://vocab.nerc.ac.uk/collection/P01/current/SLCAZZXX/" ,
"nitrite" , "http://vocab.nerc.ac.uk/collection/P01/current/NTRIZZXX/" ,
"nitrate" , "http://vocab.nerc.ac.uk/collection/P01/current/NTRAZZXX/" ,
"ammonia" , "http://vocab.nerc.ac.uk/collection/P01/current/AMOAZZXX/" ,
"c14_rep1" , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
"c14_rep2" , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
"c14_dark" , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
"c14_mean" , "http://vocab.nerc.ac.uk/collection/P01/current/OLOCSMP1/" ,
"light_pct" , "http://vocab.nerc.ac.uk/collection/P01/current/IRRDSV01/" ,
"dic_rep1" , "http://vocab.nerc.ac.uk/collection/P01/current/DICATSWT/" ,
"dic_rep2" , "http://vocab.nerc.ac.uk/collection/P01/current/DICATSWT/" ,
"alkalinity_rep1" , "http://vocab.nerc.ac.uk/collection/P01/current/MDMAP014/" ,
"alkalinity_rep2" , "http://vocab.nerc.ac.uk/collection/P01/current/MDMAP014/" ,
"ph_rep1" , "http://vocab.nerc.ac.uk/collection/P01/current/PHXXZZXX/" ,
"ph_rep2" , "http://vocab.nerc.ac.uk/collection/P01/current/PHXXZZXX/" ,
"r_depth" , "http://vocab.nerc.ac.uk/collection/P01/current/ADEPZZ01/" ,
"r_temperature" , "http://vocab.nerc.ac.uk/collection/P01/current/TEMPPR01/" ,
"r_salinity_sva" , "http://vocab.nerc.ac.uk/collection/P01/current/SVELXXXX/" ,
"r_dynamic_height" , "http://vocab.nerc.ac.uk/collection/P01/current/DYNHTTAU/" ,
"r_ammonium" , "http://vocab.nerc.ac.uk/collection/P01/current/AMOAZZXX/" ,
"r_oxygen_umol_kg" , "http://vocab.nerc.ac.uk/collection/P01/current/DLOXYUMK/"
)
# NERC P01 vocabulary for cast condition types ----
cast_p01_vocab <- tribble(
~condition_type , ~measurementTypeID ,
"wave_direction" , "http://vocab.nerc.ac.uk/collection/P01/current/GWDRZZ01/" ,
"wave_height" , "http://vocab.nerc.ac.uk/collection/P01/current/GAVHZZ01/" ,
"wave_period" , "http://vocab.nerc.ac.uk/collection/P01/current/GAVTZZ01/" ,
"wind_direction" , "http://vocab.nerc.ac.uk/collection/P01/current/EWDAZZ01/" ,
"wind_speed" , "http://vocab.nerc.ac.uk/collection/P01/current/EWSBZZ01/" ,
"barometric_pressure" , "http://vocab.nerc.ac.uk/collection/P01/current/CAPAZZ01/" ,
"dry_air_temp" , "http://vocab.nerc.ac.uk/collection/P01/current/CTMPZZ01/" ,
"wet_air_temp" , "http://vocab.nerc.ac.uk/collection/P01/current/CWETZZ01/" ,
"weather_code" , "http://vocab.nerc.ac.uk/collection/P01/current/WMOCWWXX/" ,
"cloud_type" , "http://vocab.nerc.ac.uk/collection/P01/current/WMOCLTXX/" ,
"cloud_amount" , "http://vocab.nerc.ac.uk/collection/P01/current/CLAMAMTS/" ,
"visibility" , "http://vocab.nerc.ac.uk/collection/P01/current/WMOVISXX/" ,
"secchi_depth" , "http://vocab.nerc.ac.uk/collection/P01/current/SECCSDNX/" ,
"water_color" , "http://vocab.nerc.ac.uk/collection/P01/current/CCOLXXXX/"
)
```
## Build Event Hierarchy
### Cruise Events
Aggregate over both net tows and casts to build cruise-level events:
```{r}
#| label: build-cruise-events
# collect joined net→tow→site for cruise summaries ----
d_net_site <- d_net |>
left_join(
tbl_tow |> collect() |> select(tow_id, site_id, time_start),
by = "tow_id"
) |>
left_join(
tbl_site |>
collect() |>
select(site_id, cruise_key, longitude, latitude, line, station),
by = "site_id"
)
# net cruise summaries ----
d_net_cruise_summary <- d_net_site |>
group_by(cruise_key) |>
summarize(
net_time_min = min(time_start, na.rm = TRUE),
net_time_max = max(time_start, na.rm = TRUE),
net_lon_min = min(longitude, na.rm = TRUE),
net_lon_max = max(longitude, na.rm = TRUE),
net_lat_min = min(latitude, na.rm = TRUE),
net_lat_max = max(latitude, na.rm = TRUE),
.groups = "drop"
)
# cast cruise summaries ----
d_cast_cruise_summary <- d_casts |>
group_by(cruise_key) |>
summarize(
cast_time_min = min(datetime_utc, na.rm = TRUE),
cast_time_max = max(datetime_utc, na.rm = TRUE),
cast_lon_min = min(lon_dec, na.rm = TRUE),
cast_lon_max = max(lon_dec, na.rm = TRUE),
cast_lat_min = min(lat_dec, na.rm = TRUE),
cast_lat_max = max(lat_dec, na.rm = TRUE),
.groups = "drop"
)
# combine cruise metadata ----
d_cruise_events <- tbl_cruise |>
left_join(tbl_ship, by = "ship_key") |>
collect() |>
left_join(d_net_cruise_summary, by = "cruise_key") |>
left_join(d_cast_cruise_summary, by = "cruise_key") |>
mutate(
# date range: union of net tows and casts
time_min = pmin(net_time_min, cast_time_min, na.rm = TRUE),
time_max = pmax(net_time_max, cast_time_max, na.rm = TRUE),
# bounding box: union of net sites and cast positions
lon_min = pmin(net_lon_min, cast_lon_min, na.rm = TRUE),
lon_max = pmax(net_lon_max, cast_lon_max, na.rm = TRUE),
lat_min = pmin(net_lat_min, cast_lat_min, na.rm = TRUE),
lat_max = pmax(net_lat_max, cast_lat_max, na.rm = TRUE)
) |>
filter(!is.na(time_min) | !is.na(cast_time_min)) |>
mutate(
eventID = cruise_key,
parentEventID = NA_character_,
eventType = "cruise",
eventDate = case_when(
!is.na(time_min) & !is.na(time_max) ~
paste0(
format(time_min, "%Y-%m-%dT%H:%M:%SZ"),
"/",
format(time_max, "%Y-%m-%dT%H:%M:%SZ")
),
TRUE ~ NA_character_
),
footprintWKT = case_when(
!is.na(lon_min) ~
glue(
"POLYGON(({lon_min} {lat_min}, {lon_max} {lat_min}, {lon_max} {lat_max}, {lon_min} {lat_max}, {lon_min} {lat_min}))"
),
TRUE ~ NA_character_
),
samplingProtocol = "Marine plankton survey cruise",
habitat = "marine",
eventRemarks = glue(
"Cruise on {ship_name} from {as.Date(time_min)} to {as.Date(time_max)}"
)
) |>
select(
eventID,
parentEventID,
eventType,
eventDate,
footprintWKT,
samplingProtocol,
habitat,
eventRemarks
)
cat(glue("Cruise events: {nrow(d_cruise_events)}"), "\n")
```
### Net Sample Events
Collapsed net_sample events (cruise → net_sample) with coordinates, date, and sampling info:
```{r}
#| label: build-net-events
d_net_events <- d_net_site |>
mutate(
eventID = net_uuid,
parentEventID = cruise_key,
eventType = "net_sample",
eventDate = format(time_start, "%Y-%m-%dT%H:%M:%SZ"),
decimalLatitude = latitude,
decimalLongitude = longitude,
locationID = paste0(line, "_", station),
samplingProtocol = "CalCOFI ichthyoplankton net tow",
sampleSizeValue = vol_sampled_m3,
sampleSizeUnit = "m^3",
eventRemarks = glue(
"Net: {side} side; ",
"{prop_sorted} proportion sorted of {vol_sampled_m3} m3 sampled"
)
)
# keep full data for eMoF
d_net_events_full <- d_net_events
d_net_events <- d_net_events |>
select(
eventID,
parentEventID,
eventType,
eventDate,
decimalLatitude,
decimalLongitude,
locationID,
samplingProtocol,
sampleSizeValue,
sampleSizeUnit,
eventRemarks
)
stopifnot(all(!duplicated(d_net_events$eventID)))
cat(glue("Net sample events: {nrow(d_net_events)}"), "\n")
```
### CTD Cast Events
One event per cast:
```{r}
#| label: build-cast-events
d_cast_events <- d_casts |>
mutate(
eventID = cast_uuid,
parentEventID = cruise_key,
eventType = "ctd_cast",
eventDate = format(datetime_utc, "%Y-%m-%dT%H:%M:%SZ"),
decimalLatitude = lat_dec,
decimalLongitude = lon_dec,
locationID = paste0(rpt_line, "_", rpt_sta),
samplingProtocol = "CTD rosette cast",
sampleSizeValue = bottom_depth_m,
sampleSizeUnit = "m",
eventRemarks = glue(
"CTD cast at station {rpt_line}_{rpt_sta}, bottom depth {bottom_depth_m} m"
)
) |>
select(
eventID,
parentEventID,
eventType,
eventDate,
decimalLatitude,
decimalLongitude,
locationID,
samplingProtocol,
sampleSizeValue,
sampleSizeUnit,
eventRemarks
)
stopifnot(all(!duplicated(d_cast_events$eventID)))
cat(glue("CTD cast events: {nrow(d_cast_events)}"), "\n")
```
### Combine All Events
```{r}
#| label: combine-events
d_event <- bind_rows(
d_cruise_events,
d_net_events,
d_cast_events
) |>
mutate(
geodeticDatum = "WGS84",
coordinateUncertaintyInMeters = 1000,
countryCode = country_code,
waterBody = water_body,
datasetID = dataset_id
)
cat("Event counts by type:\n")
d_event |> count(eventType) |> print()
# verify all child events have valid parentEventID
cruise_ids <- d_event |> filter(eventType == "cruise") |> pull(eventID)
child_events <- d_event |> filter(eventType != "cruise")
orphan_children <- child_events |> filter(!parentEventID %in% cruise_ids)
if (nrow(orphan_children) > 0) {
warning(glue(
"{nrow(orphan_children)} child events have no matching cruise parent"
))
} else {
cat("All child events have valid cruise parents.\n")
}
```
## Build Occurrence Records
From the consolidated `ichthyo` table — one occurrence per life stage per species per net sample (tally rows where `measurement_type` is NULL):
```{r}
#| label: build-occurrences
d_occurrence <- d_ichthyo |>
filter(is.na(measurement_type)) |>
left_join(
tbl_species |> collect(),
by = "species_id"
) |>
mutate(
occurrenceID = ichthyo_uuid,
eventID = net_uuid,
scientificName = scientific_name,
scientificNameID = ifelse(
!is.na(worms_id) & worms_id > 0,
paste0("urn:lsid:marinespecies.org:taxname:", worms_id),
NA_character_
),
kingdom = "Animalia",
occurrenceStatus = ifelse(tally > 0, "present", "absent"),
organismQuantity = tally,
organismQuantityType = "individuals",
lifeStage = life_stage,
preparations = paste0(life_stage, " sample"),
basisOfRecord = "HumanObservation",
modified = Sys.Date()
)
# verify all occurrences link to valid net_sample events
net_ids <- d_net_events |> pull(eventID)
orphan_occs <- d_occurrence |> filter(!eventID %in% net_ids)
if (nrow(orphan_occs) > 0) {
warning(glue(
"{nrow(orphan_occs)} occurrences have no matching net_sample event"
))
} else {
cat("All occurrences link to valid net_sample events.\n")
}
d_occurrence_out <- d_occurrence |>
select(
occurrenceID,
eventID,
scientificName,
scientificNameID,
kingdom,
occurrenceStatus,
organismQuantity,
organismQuantityType,
lifeStage,
preparations,
basisOfRecord,
modified
)
cat(glue("Occurrences: {nrow(d_occurrence_out)}"), "\n")
cat(glue("Unique species: {n_distinct(d_occurrence_out$scientificName)}"), "\n")
```
## Build ExtendedMeasurementOrFact Extension
### 10a. Net-level sample measurements
```{r}
#| label: emof-net-level
d_emof_net <- d_net_events_full |>
select(
eventID,
net_uuid,
std_haul_factor,
prop_sorted,
smallplankton,
totalplankton
) |>
pivot_longer(
cols = c(std_haul_factor, prop_sorted, smallplankton, totalplankton),
names_to = "meas_type",
values_to = "measurementValue"
) |>
filter(!is.na(measurementValue)) |>
mutate(
measurementID = UUIDfromName(
CALCOFI_NS,
paste0("net:", net_uuid, ":", meas_type)
),
occurrenceID = NA_character_,
measurementType = case_when(
meas_type == "std_haul_factor" ~ "standardized haul factor",
meas_type == "prop_sorted" ~ "proportion of sample sorted",
meas_type == "smallplankton" ~ "small plankton biomass",
meas_type == "totalplankton" ~ "total plankton biomass"
),
measurementTypeID = NA_character_,
measurementUnit = case_when(
meas_type == "std_haul_factor" ~ "dimensionless",
meas_type == "prop_sorted" ~ "dimensionless",
meas_type %in% c("smallplankton", "totalplankton") ~ "grams"
),
measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
measurementRemarks = case_when(
meas_type == "std_haul_factor" ~
"Standardization factor for water filtered; abundances per 10 m^2 (Smith 1977)",
meas_type == "prop_sorted" ~ "Fraction of total sample examined",
TRUE ~ NA_character_
)
) |>
left_join(measurement_unit_vocab, by = "measurementUnit") |>
select(
eventID,
occurrenceID,
measurementID,
measurementType,
measurementTypeID,
measurementValue,
measurementUnit,
measurementUnitID,
measurementMethod,
measurementRemarks
)
cat(glue("eMoF net-level: {nrow(d_emof_net)}"), "\n")
```
### 10b. Stage-specific abundance
From `ichthyo` rows where `measurement_type = 'stage'`, linked to the matching tally row's `ichthyo_uuid` as `occurrenceID`:
```{r}
#| label: emof-stage-abundance
# build lookup: tally occurrenceIDs with their eventID (net_uuid)
d_occ_lookup <- d_occurrence |>
select(
net_id,
species_id,
life_stage,
occurrenceID = ichthyo_uuid,
occ_eventID = eventID
)
# collect lookup descriptions for stages
d_lookup <- tbl_lookup |>
filter(lookup_type %in% c("egg_stage", "larva_stage")) |>
collect()
# stage records — join to lookup via derived lookup_type
d_emof_stage <- d_ichthyo |>
filter(measurement_type == "stage") |>
mutate(
lookup_type = paste0(life_stage, "_stage")
) |>
left_join(
d_lookup |> select(lookup_type, lookup_num, description),
by = c("lookup_type", "measurement_value" = "lookup_num")
) |>
left_join(d_occ_lookup, by = c("net_id", "species_id", "life_stage")) |>
mutate(
eventID = occ_eventID,
measurementID = ichthyo_uuid,
measurementType = "abundance by life stage",
measurementTypeID = NA_character_,
meas_val = tally,
measurementValueID = NA_character_,
measurementUnit = "individuals",
measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
measurementRemarks = description
) |>
left_join(measurement_unit_vocab, by = "measurementUnit") |>
select(
eventID,
occurrenceID,
measurementID,
measurementType,
measurementTypeID,
measurementValue = meas_val,
measurementValueID,
measurementUnit,
measurementUnitID,
measurementMethod,
measurementRemarks
)
cat(glue("eMoF stage abundance: {nrow(d_emof_stage)}"), "\n")
```
### 10c. Body length measurements
From `ichthyo` rows where `measurement_type = 'size'`:
```{r}
#| label: emof-body-length
d_emof_length <- d_ichthyo |>
filter(measurement_type == "size") |>
left_join(d_occ_lookup, by = c("net_id", "species_id", "life_stage")) |>
filter(!is.na(measurement_value)) |>
mutate(
eventID = occ_eventID,
measurementID = ichthyo_uuid,
measurementType = "body length",
measurementTypeID = NA_character_,
meas_val = measurement_value,
measurementValueID = NA_character_,
measurementUnit = "millimeters",
measurementMethod = "https://oceanview.pfeg.noaa.gov/CalCOFI/calcofi_info.html",
measurementRemarks = paste0("Count: ", tally, "; Total length measurement")
) |>
left_join(measurement_unit_vocab, by = "measurementUnit") |>
select(
eventID,
occurrenceID,
measurementID,
measurementType,
measurementTypeID,
measurementValue = meas_val,
measurementValueID,
measurementUnit,
measurementUnitID,
measurementMethod,
measurementRemarks
)
cat(glue("eMoF body length: {nrow(d_emof_length)}"), "\n")
```
### 10d. Bottle measurements
From `bottle_measurement` joined through `bottle` → `casts`:
```{r}
#| label: emof-bottle
# build cast_id → cast_uuid lookup
d_cast_uuid_lookup <- d_casts |> select(cast_id, cast_uuid)
# get measurement type descriptions and units
d_mtype <- tbl_measurement_type |>
filter(dataset == "bottle") |>
collect()
d_emof_bottle <- tbl_bottle_meas |>
left_join(
tbl_bottle |> select(bottle_id, cast_id, depth_m),
by = "bottle_id"
) |>
collect() |>
inner_join(d_cast_uuid_lookup, by = "cast_id") |>
left_join(
d_mtype |> select(measurement_type, description, units),
by = "measurement_type"
) |>
left_join(bottle_p01_vocab, by = "measurement_type") |>
mutate(
eventID = cast_uuid,
occurrenceID = NA_character_,
measurementID = UUIDfromName(
CALCOFI_NS,
paste0("bm:", bottle_measurement_id)
),
measurementType = description,
meas_val = measurement_value,
measurementUnit = units,
measurementMethod = "https://calcofi.org/data/oceanographic-data/bottle-database/",
measurementRemarks = paste0("depth_m: ", depth_m)
) |>
left_join(measurement_unit_vocab, by = "measurementUnit") |>
select(
eventID,
occurrenceID,
measurementID,
measurementType,
measurementTypeID,
measurementValue = meas_val,
measurementUnit,
measurementUnitID,
measurementMethod,
measurementRemarks
)
cat(glue("eMoF bottle measurements: {nrow(d_emof_bottle)}"), "\n")
```
### 10e. Cast conditions
From `cast_condition` joined to `casts`:
```{r}
#| label: emof-cast-conditions
d_emof_cast_cond <- tbl_cast_condition |>
collect() |>
inner_join(d_cast_uuid_lookup, by = "cast_id") |>
left_join(
tbl_measurement_type |>
filter(dataset == "cast") |>
collect() |>
select(measurement_type, description, units),
by = c("condition_type" = "measurement_type")
) |>
left_join(cast_p01_vocab, by = "condition_type") |>
mutate(
eventID = cast_uuid,
occurrenceID = NA_character_,
measurementID = UUIDfromName(
CALCOFI_NS,
paste0("cc:", cast_condition_id)
),
measurementType = description,
measurementValue = condition_value,
measurementUnit = ifelse(is.na(units) | units == "", NA_character_, units),
measurementMethod = "https://calcofi.org/data/oceanographic-data/bottle-database/",
measurementRemarks = NA_character_
) |>
left_join(measurement_unit_vocab, by = "measurementUnit") |>
select(
eventID,
occurrenceID,
measurementID,
measurementType,
measurementTypeID,
measurementValue,
measurementUnit,
measurementUnitID,
measurementMethod,
measurementRemarks
)
cat(glue("eMoF cast conditions: {nrow(d_emof_cast_cond)}"), "\n")
```
### Combine All eMoF Records
```{r}
#| label: combine-emof
d_emof <- bind_rows(
d_emof_net,
d_emof_stage |> select(-measurementValueID),
d_emof_length |> select(-measurementValueID),
d_emof_bottle,
d_emof_cast_cond
)
cat("eMoF counts by source:\n")
cat(glue(" Net-level: {nrow(d_emof_net)}"), "\n")
cat(glue(" Stage abundance: {nrow(d_emof_stage)}"), "\n")
cat(glue(" Body length: {nrow(d_emof_length)}"), "\n")
cat(glue(" Bottle meas: {nrow(d_emof_bottle)}"), "\n")
cat(glue(" Cast conditions: {nrow(d_emof_cast_cond)}"), "\n")
cat(glue(" Total: {nrow(d_emof)}"), "\n")
# verify all eMoF records link to a valid eventID or occurrenceID
event_ids <- d_event$eventID
occ_ids <- d_occurrence_out$occurrenceID
emof_with_event <- d_emof |> filter(!is.na(eventID))
emof_with_occ <- d_emof |> filter(!is.na(occurrenceID))
emof_orphan_event <- emof_with_event |> filter(!eventID %in% event_ids)
emof_orphan_occ <- emof_with_occ |> filter(!occurrenceID %in% occ_ids)
if (nrow(emof_orphan_event) > 0) {
warning(glue("{nrow(emof_orphan_event)} eMoF records have invalid eventID"))
}
if (nrow(emof_orphan_occ) > 0) {
warning(glue(
"{nrow(emof_orphan_occ)} eMoF records have invalid occurrenceID"
))
}
```
## Write DarwinCore Archive Files
Export the event core and extension files as CSV:
```{r}
#| label: export-dwc
dir.create(dir_out, recursive = TRUE, showWarnings = FALSE)
write_csv(d_event, file.path(dir_out, "event.csv"), na = "")
write_csv(d_occurrence_out, file.path(dir_out, "occurrence.csv"), na = "")
write_csv(d_emof, file.path(dir_out, "extendedMeasurementOrFact.csv"), na = "")
cat("Wrote CSV files to:", dir_out, "\n")
```
## Create meta.xml
Generate `meta.xml` by mapping CSV column names to DarwinCore term URIs:
```{r}
#| label: create-meta-xml
# DwC term mappings ----
dwc_terms <- list(
event = list(
rowType = "http://rs.tdwg.org/dwc/terms/Event",
idField = "eventID",
terms = c(
eventID = "http://rs.tdwg.org/dwc/terms/eventID",
parentEventID = "http://rs.tdwg.org/dwc/terms/parentEventID",
eventType = "http://rs.tdwg.org/dwc/terms/eventType",
eventDate = "http://rs.tdwg.org/dwc/terms/eventDate",
samplingProtocol = "http://rs.tdwg.org/dwc/terms/samplingProtocol",
sampleSizeValue = "http://rs.tdwg.org/dwc/terms/sampleSizeValue",
sampleSizeUnit = "http://rs.tdwg.org/dwc/terms/sampleSizeUnit",
eventRemarks = "http://rs.tdwg.org/dwc/terms/eventRemarks",
habitat = "http://rs.tdwg.org/dwc/terms/habitat",
footprintWKT = "http://rs.tdwg.org/dwc/terms/footprintWKT",
decimalLatitude = "http://rs.tdwg.org/dwc/terms/decimalLatitude",
decimalLongitude = "http://rs.tdwg.org/dwc/terms/decimalLongitude",
geodeticDatum = "http://rs.tdwg.org/dwc/terms/geodeticDatum",
coordinateUncertaintyInMeters = "http://rs.tdwg.org/dwc/terms/coordinateUncertaintyInMeters",
locationID = "http://rs.tdwg.org/dwc/terms/locationID",
countryCode = "http://rs.tdwg.org/dwc/terms/countryCode",
waterBody = "http://rs.tdwg.org/dwc/terms/waterBody",
datasetID = "http://rs.tdwg.org/dwc/terms/datasetID"
)
),
occurrence = list(
rowType = "http://rs.tdwg.org/dwc/terms/Occurrence",
idField = "occurrenceID",
coreIdField = "eventID",
terms = c(
occurrenceID = "http://rs.tdwg.org/dwc/terms/occurrenceID",
eventID = "http://rs.tdwg.org/dwc/terms/eventID",
scientificName = "http://rs.tdwg.org/dwc/terms/scientificName",
scientificNameID = "http://rs.tdwg.org/dwc/terms/scientificNameID",
kingdom = "http://rs.tdwg.org/dwc/terms/kingdom",
occurrenceStatus = "http://rs.tdwg.org/dwc/terms/occurrenceStatus",
organismQuantity = "http://rs.tdwg.org/dwc/terms/organismQuantity",
organismQuantityType = "http://rs.tdwg.org/dwc/terms/organismQuantityType",
lifeStage = "http://rs.tdwg.org/dwc/terms/lifeStage",
preparations = "http://rs.tdwg.org/dwc/terms/preparations",
basisOfRecord = "http://rs.tdwg.org/dwc/terms/basisOfRecord",
modified = "http://purl.org/dc/terms/modified"
)
),
extendedmeasurementorfact = list(
rowType = "http://rs.iobis.org/obis/terms/ExtendedMeasurementOrFact",
idField = "measurementID",
coreIdField = "eventID",
terms = c(
eventID = "http://rs.tdwg.org/dwc/terms/eventID",
occurrenceID = "http://rs.tdwg.org/dwc/terms/occurrenceID",
measurementID = "http://rs.tdwg.org/dwc/terms/measurementID",
measurementType = "http://rs.tdwg.org/dwc/terms/measurementType",
measurementTypeID = "http://rs.iobis.org/obis/terms/measurementTypeID",
measurementValue = "http://rs.tdwg.org/dwc/terms/measurementValue",
measurementUnit = "http://rs.tdwg.org/dwc/terms/measurementUnit",
measurementUnitID = "http://rs.iobis.org/obis/terms/measurementUnitID",
measurementMethod = "http://rs.tdwg.org/dwc/terms/measurementMethod",
measurementRemarks = "http://rs.tdwg.org/dwc/terms/measurementRemarks"
)
)
)
# helper functions ----
create_field_elements <- function(csv_file, term_map, coreid_field = NULL) {
col_names <- names(read_csv(csv_file, n_max = 0, show_col_types = FALSE))
field_elements <- map(seq_along(col_names), function(i) {
col <- col_names[i]
if (!is.null(coreid_field) && col == coreid_field) {
return(NULL)
}
term <- term_map$terms[[col]]
if (!is.null(term)) {
glue(' <field index="{i-1}" term="{term}"/>')
} else {
message(
"Warning: Column '",
col,
"' in ",
csv_file,
" has no DwC mapping."
)
NULL
}
})
paste(compact(field_elements), collapse = "\n")
}
get_coreid_index <- function(csv_file, coreid_field) {
col_names <- names(read_csv(csv_file, n_max = 0, show_col_types = FALSE))
if (length(coreid_field) > 1) {
coreid_field <- intersect(coreid_field, col_names)[1]
}
which(col_names == coreid_field) - 1
}
# generate meta.xml ----
generate_meta_xml <- function(dir_out) {
event_file <- file.path(dir_out, "event.csv")
event_fields <- create_field_elements(event_file, dwc_terms$event)
event_id_idx <- which(
names(read_csv(event_file, n_max = 0, show_col_types = FALSE)) ==
dwc_terms$event$idField
) -
1
occ_file <- file.path(dir_out, "occurrence.csv")
occ_fields <- create_field_elements(
occ_file,
dwc_terms$occurrence,
coreid_field = dwc_terms$occurrence$coreIdField
)
occ_coreid_idx <- get_coreid_index(occ_file, dwc_terms$occurrence$coreIdField)
emof_file <- file.path(dir_out, "extendedMeasurementOrFact.csv")
emof_fields <- create_field_elements(
emof_file,
dwc_terms$extendedmeasurementorfact,
coreid_field = dwc_terms$extendedmeasurementorfact$coreIdField
)
emof_coreid_idx <- get_coreid_index(
emof_file,
dwc_terms$extendedmeasurementorfact$coreIdField
)
glue(
'<?xml version="1.0" encoding="UTF-8"?>
<archive xmlns="http://rs.tdwg.org/dwc/text/"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://rs.tdwg.org/dwc/text/ http://rs.tdwg.org/dwc/text/tdwg_dwc_text.xsd">
<core encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$event$rowType}">
<files>
<location>event.csv</location>
</files>
<id index="{event_id_idx}" />
{event_fields}
</core>
<extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$occurrence$rowType}">
<files>
<location>occurrence.csv</location>
</files>
<coreid index="{occ_coreid_idx}" />
{occ_fields}
</extension>
<extension encoding="UTF-8" fieldsTerminatedBy="," linesTerminatedBy="\\n"
fieldsEnclosedBy=\'"\' ignoreHeaderLines="1" rowType="{dwc_terms$extendedmeasurementorfact$rowType}">
<files>
<location>extendedMeasurementOrFact.csv</location>
</files>
<coreid index="{emof_coreid_idx}" />
{emof_fields}
</extension>
</archive>'
)
}
meta_xml <- generate_meta_xml(dir_out)
writeLines(meta_xml, file.path(dir_out, "meta.xml"))
cat("Generated meta.xml\n")
cat(meta_xml)
```
## Create EML Metadata
Build EML document with geographic, temporal, and taxonomic coverage:
```{r}
#| label: build-eml
# geographic coverage: union of net sites and cast positions ----
geo_net <- tbl_site |>
summarise(
lon_min = min(longitude, na.rm = TRUE),
lon_max = max(longitude, na.rm = TRUE),
lat_min = min(latitude, na.rm = TRUE),
lat_max = max(latitude, na.rm = TRUE)
) |>
collect()
geo_cast <- d_casts |>
summarise(
lon_min = min(lon_dec, na.rm = TRUE),
lon_max = max(lon_dec, na.rm = TRUE),
lat_min = min(lat_dec, na.rm = TRUE),
lat_max = max(lat_dec, na.rm = TRUE)
)
geo_coverage <- tibble(
westBoundingCoordinate = min(geo_net$lon_min, geo_cast$lon_min, na.rm = TRUE),
eastBoundingCoordinate = max(geo_net$lon_max, geo_cast$lon_max, na.rm = TRUE),
southBoundingCoordinate = min(
geo_net$lat_min,
geo_cast$lat_min,
na.rm = TRUE
),
northBoundingCoordinate = max(geo_net$lat_max, geo_cast$lat_max, na.rm = TRUE)
)
# temporal coverage: union of tow and cast dates ----
temp_net <- tbl_tow |>
summarise(
tmin = min(time_start, na.rm = TRUE),
tmax = max(time_start, na.rm = TRUE)
) |>
collect()
temp_cast <- d_casts |>
summarise(
tmin = min(datetime_utc, na.rm = TRUE),
tmax = max(datetime_utc, na.rm = TRUE)
)
temp_coverage <- list(
beginDate = as.character(as.Date(min(
temp_net$tmin,
temp_cast$tmin,
na.rm = TRUE
))),
endDate = as.character(as.Date(max(
temp_net$tmax,
temp_cast$tmax,
na.rm = TRUE
)))
)
# taxonomic coverage ----
d_taxa <- tbl_taxon |> collect()
taxa_coverage <- tbl_species |>
mutate(
worms_id = case_when(
is.na(worms_id) | worms_id == 0 ~ NA_integer_,
TRUE ~ worms_id
)
) |>
collect() |>
filter(!is.na(worms_id)) |>
distinct(species_id, worms_id, common_name) |>
left_join(
d_taxa |> distinct(taxonID, taxonRank, scientificName),
by = join_by(worms_id == taxonID)
) |>
group_by(worms_id, taxonRank, scientificName) |>
summarize(
species_id = first(species_id),
common_name = paste(unique(na.omit(common_name)), collapse = ", "),
.groups = "drop"
) |>
arrange(species_id) |>
filter(!is.na(scientificName)) |>
pmap(function(
species_id,
taxonRank,
scientificName,
common_name,
worms_id,
...
) {
item <- list(
id = glue("calcofi_species_{species_id}"),
taxonRankName = taxonRank,
taxonRankValue = scientificName,
taxonId = list(
provider = "https://www.marinespecies.org",
as.character(worms_id)
)
)
if (!is.na(common_name) && common_name != "") {
item$commonName <- common_name
}
item
})
# build EML document ----
my_eml <- list(
packageId = dataset_id,
system = "calcofi.io",
dataset = list(
title = dataset_title,
intellectualRights = intellectual_rights,
creator = list(
individualName = list(givenName = "Ed", surName = "Weber"),
organizationName = "NOAA SWFSC",
electronicMailAddress = "ed.weber@noaa.gov",
userId = list(
directory = "https://orcid.org/",
userId = "0000-0002-0942-434X"
)
),
abstract = dataset_abstract,
keywordSet = list(
keyword = dataset_keywords,
keywordThesaurus = "GCMD Science Keywords"
),
coverage = list(
geographicCoverage = list(
geographicDescription = "California Current Large Marine Ecoregion",
boundingCoordinates = geo_coverage
),
temporalCoverage = list(
rangeOfDates = list(
beginDate = list(calendarDate = temp_coverage$beginDate),
endDate = list(calendarDate = temp_coverage$endDate)
)
),
taxonomicCoverage = list(
generalTaxonomicCoverage = "Marine ichthyoplankton (fish eggs and larvae)",
taxonomicClassification = taxa_coverage
)
),
contact = list(
individualName = list(givenName = "Ed", surName = "Weber"),
electronicMailAddress = "ed.weber@noaa.gov"
),
methods = list(
methodStep = list(
description = list(para = sampling_methods_description)
),
sampling = list(
studyExtent = list(
description = list(para = study_extent_description)
),
samplingDescription = list(
para = sampling_description
)
)
),
project = list(
title = dataset_title,
personnel = list(
individualName = list(givenName = "Ed", surName = "Weber"),
role = "Data Manager"
),
funding = list(para = funding_information)
)
)
)
eml_xml <- file.path(dir_out, "eml.xml")
write_eml(my_eml, eml_xml)
# replace license placeholder with actual license xml
readLines(eml_xml) |>
str_replace(fixed(names(license_xml)), license_xml) |>
writeLines(eml_xml)
validation_result <- eml_validate(eml_xml)
if (!validation_result) {
cat("EML validation errors:\n")
print(attr(validation_result, "errors"))
stop("EML validation failed")
} else {
cat("EML is valid!\n")
}
```
## Data Quality Checks
Validate event hierarchy, orphan records, and output summary statistics:
```{r}
#| label: check-data-quality
# check for missing WoRMS IDs
d_missing_worms <- tbl_species |>
filter(is.na(worms_id) | worms_id == 0) |>
select(species_id, scientific_name) |>
collect()
if (nrow(d_missing_worms) > 0) {
cat("WARNING:", nrow(d_missing_worms), "species missing WoRMS IDs:\n")
print(d_missing_worms, n = 20)
}
# summary statistics ----
n_cruises <- d_event |> filter(eventType == "cruise") |> nrow()
n_nets <- d_event |> filter(eventType == "net_sample") |> nrow()
n_casts_out <- d_event |> filter(eventType == "ctd_cast") |> nrow()
n_occs <- nrow(d_occurrence_out)
n_species <- n_distinct(d_occurrence_out$scientificName)
n_emof <- nrow(d_emof)
glue(
"
=== Dataset Summary ===
Total events: {prettyNum(nrow(d_event), big.mark = ',')}
- Cruises: {prettyNum(n_cruises, big.mark = ',')}
- Net samples: {prettyNum(n_nets, big.mark = ',')}
- CTD casts: {prettyNum(n_casts_out, big.mark = ',')}
Total occurrences: {prettyNum(n_occs, big.mark = ',')}
Unique species: {prettyNum(n_species, big.mark = ',')}
Total eMoF records: {prettyNum(n_emof, big.mark = ',')}"
)
```
## Package DarwinCore Archive
Create the final DwC-A zip file:
```{r}
#| label: create-dwc-archive
zip_file <- file.path(
dirname(dir_out),
paste0("ichthyo_bottle_", Sys.Date(), ".zip")
)
zip(
zip_file,
files = file.path(
dir_out,
c(
"event.csv",
"occurrence.csv",
"extendedMeasurementOrFact.csv",
"meta.xml",
"eml.xml"
)
),
flags = "-j"
)
cat("\nDarwin Core Archive created:", zip_file, "\n")
```
## Sync to Google Cloud Storage
```{r}
#| label: sync-to-gcs
gcs_bucket <- "calcofi-db"
gcs_publish_prefix <- "publish/ichthyo_bottle"
sync_results <- sync_to_gcs(
local_dir = dir_out,
gcs_prefix = gcs_publish_prefix,
bucket = gcs_bucket
)
```
## Validate with obistools
```{r}
#| label: obistools-validate
# read back the CSVs
d_event_check <- read_csv(file.path(dir_out, "event.csv"))
d_occ_check <- read_csv(file.path(dir_out, "occurrence.csv"))
d_emof_check <- read_csv(file.path(dir_out, "extendedMeasurementOrFact.csv"))
# event hierarchy check
cat("=== Event hierarchy check ===\n")
event_check <- check_eventids(d_event_check)
if (nrow(event_check) == 0) {
cat("PASS: all event parent-child relationships valid\n")
} else {
cat("ISSUES found:\n")
print(event_check)
}
# occurrence → event linkage
cat("\n=== Occurrence-event linkage ===\n")
occ_check <- check_extension_eventids(d_event_check, d_occ_check)
if (nrow(occ_check) == 0) {
cat("PASS: all occurrences link to valid events\n")
} else {
cat("ISSUES found:\n")
print(occ_check)
}
# eMoF → event linkage
cat("\n=== eMoF-event linkage ===\n")
emof_event_check <- d_emof_check |>
filter(!is.na(eventID)) |>
filter(!eventID %in% d_event_check$eventID)
if (nrow(emof_event_check) == 0) {
cat("PASS: all eMoF eventIDs link to valid events\n")
} else {
cat(nrow(emof_event_check), "eMoF records with invalid eventID\n")
}
# eMoF → occurrence linkage
emof_occ_check <- d_emof_check |>
filter(!is.na(occurrenceID)) |>
filter(!occurrenceID %in% d_occ_check$occurrenceID)
if (nrow(emof_occ_check) == 0) {
cat("PASS: all eMoF occurrenceIDs link to valid occurrences\n")
} else {
cat(nrow(emof_occ_check), "eMoF records with invalid occurrenceID\n")
}
# summary by event type
cat("\n=== Events by type ===\n")
d_event_check |> count(eventType) |> print()
cat("\n=== eMoF by measurementType ===\n")
d_emof_check |> count(measurementType, sort = TRUE) |> print(n = 50)
```
## Cleanup
```{r}
#| label: cleanup
dbDisconnect(con, shutdown = TRUE)
cat("Database connection closed.\n")
```