---
title: "Ingest Underway CUFES Fish Eggs"
calcofi:
target_name: ingest_swfsc_cufes
workflow_type: ingest
dependency:
- ingest_swfsc_ichthyo
output: data/parquet/swfsc_cufes/manifest.json
provider: swfsc
dataset: cufes
workflow_url: https://calcofi.io/workflows/ingest_swfsc_cufes.html
questions_file: metadata/swfsc/cufes/questions.csv
dataset_meta:
dataset_name: CalCOFI Underway CUFES Fish Eggs
description: >
Continuous Underway Fish Egg Sampler (CUFES) egg counts (sardine, anchovy,
jack mackerel, hake, squid, other) with underway environmental conditions,
from CalCOFI cruises (1996-present). Source: NOAA CoastWatch ERDDAP
erdCalCOFIcufes.
citation_main: ""
link_calcofi_org: ""
link_data_source: "https://coastwatch.pfeg.noaa.gov/erddap/tabledap/erdCalCOFIcufes.html"
link_others: []
coverage_temporal: 1996 to present
coverage_spatial: "CalCOFI region (underway)"
license: ""
pi_names: Noelle Bowlin
tables_owned:
- {table: cufes_sample}
- {table: cufes_measurement}
- {table: measurement_type, shared: true}
erd:
color: "#f0e8bb"
editor_options:
chunk_output_type: console
---
## Overview
**Source**: NOAA CoastWatch ERDDAP [`erdCalCOFIcufes`](https://coastwatch.pfeg.noaa.gov/erddap/tabledap/erdCalCOFIcufes.html)
— Continuous Underway Fish Egg Sampler egg counts (sardine/anchovy/jack mackerel/
hake/squid/other) + underway environment. Provider `swfsc` (Noelle Bowlin), 1996-present.
## Setup
```{r}
#| label: setup
#| message: false
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
librarian::shelf(
CalCOFI/calcofi4db, CalCOFI/calcofi4r,
DBI, dplyr, DT, fs, glue, here, jsonlite, knitr, lubridate, purrr,
readr, sf, stringr, tibble, tidyr, units, quiet = T)
options(readr.show_col_types = F); options(DT.options = list(scrollX = TRUE))
source(here("libs/ingest.R"))
cc <- read_calcofi_meta(here("ingest_swfsc_cufes.qmd"))
provider <- cc$provider; dataset <- cc$dataset; tables_owned <- cc$tables_owned
dir_label <- glue("{provider}_{dataset}")
dir_parquet <- here(glue("data/parquet/{dir_label}"))
db_path <- here(glue("data/wrangling/{dir_label}.duckdb"))
if (overwrite && file_exists(db_path)) file_delete(db_path)
dir_create(dirname(db_path)); con <- get_duckdb_con(db_path); load_duckdb_extension(con, "spatial")
meas_type_csv <- here("metadata/measurement_type.csv"); d_meas_type <- read_csv(meas_type_csv)
```
## Download from ERDDAP
```{r}
#| label: download
dir_dl <- here(glue("data/cache/{dir_label}")); dir_create(dir_dl)
cufes_csv <- file.path(dir_dl, "erdCalCOFIcufes.csv")
erddap_url <- "https://coastwatch.pfeg.noaa.gov/erddap/tabledap/erdCalCOFIcufes.csv"
if (overwrite || !file_exists(cufes_csv))
download.file(erddap_url, cufes_csv, quiet = TRUE)
# ERDDAP CSV: row 1 = column names, row 2 = units, rows 3+ = data
hdr <- read_csv(cufes_csv, n_max = 0) |> names()
d_raw <- read_csv(cufes_csv, skip = 2, col_names = hdr)
cat(glue("Downloaded {format(nrow(d_raw), big.mark=',')} CUFES samples"), "\n")
sync_to_gcs(local_dir = dir_dl, gcs_prefix = glue("archive/{provider}/{dataset}"),
bucket = "calcofi-files-public", exclude = c(".DS_Store"))
```
## Build Sample + Measurement Tables
```{r}
#| label: build
d <- d_raw |>
mutate(sample_id = row_number(), .before = 1) |>
mutate(
datetime_start_utc = as_datetime(time),
datetime_end_utc = suppressWarnings(as_datetime(stop_time)),
across(c(latitude, longitude, stop_latitude, stop_longitude,
start_temperature, start_salinity, start_wind_speed, start_wind_direction, start_pump_speed,
stop_temperature, stop_salinity, stop_wind_speed, stop_wind_direction, stop_pump_speed),
~ suppressWarnings(as.numeric(.x))))
d_sample <- d |>
transmute(
sample_id, cruise_orig = cruise, ship_name = ship, ship_code,
sample_number = suppressWarnings(as.integer(sample_number)),
datetime_start_utc, datetime_end_utc, latitude, longitude,
latitude_stop = stop_latitude, longitude_stop = stop_longitude,
start_temperature, start_salinity, start_wind_speed, start_wind_direction, start_pump_speed,
stop_temperature, stop_salinity, stop_wind_speed, stop_wind_direction, stop_pump_speed)
dbWriteTable(con, "cufes_sample", d_sample, overwrite = TRUE)
# pivot egg counts -> long measurement
egg_cols <- c("sardine_eggs","anchovy_eggs","jack_mackerel_eggs","hake_eggs","squid_eggs","other_fish_eggs")
d_meas <- d |>
select(sample_id, all_of(egg_cols)) |>
pivot_longer(all_of(egg_cols), names_to = "measurement_type", values_to = "measurement_value") |>
filter(!is.na(measurement_value)) |>
mutate(measurement_value = as.double(measurement_value), measurement_qual = NA_character_) |>
mutate(cufes_measurement_id = row_number(), .before = 1)
dbWriteTable(con, "cufes_measurement", d_meas, overwrite = TRUE)
cat(glue("cufes_sample {nrow(d_sample)}, cufes_measurement {nrow(d_meas)}"), "\n")
```
## Resolve Keys + Spatial
```{r}
#| label: keys-spatial
load_prior_tables(con, parquet_dir = here("data/parquet/swfsc_ichthyo"),
tables = c("ship","cruise","grid"), geom_tables = c("grid"), as_view = TRUE)
d_ship <- dbGetQuery(con, "SELECT ship_key, ship_name, ship_nodc FROM ship") |>
mutate(ship_name_norm = ship_name |> str_to_upper() |> str_squish())
zt <- dbGetQuery(con, "SELECT sample_id, ship_name, datetime_start_utc FROM cufes_sample") |>
mutate(ship_name_norm = ship_name |> str_replace("^R/?V\\.?\\s+","") |> str_to_upper() |> str_squish()) |>
left_join(d_ship |> select(ship_name_norm, ship_key, ship_nodc), by = "ship_name_norm") |>
mutate(cruise_key = if_else(is.na(ship_nodc) | is.na(datetime_start_utc), NA_character_,
as.character(glue("{format(datetime_start_utc,'%Y-%m')}-{ship_nodc}"))))
valid_ck <- dbGetQuery(con, "SELECT DISTINCT cruise_key FROM cruise")$cruise_key
zt <- zt |> mutate(cruise_key = if_else(cruise_key %in% valid_ck, cruise_key, NA_character_))
dbWriteTable(con, "zk", zt |> select(sample_id, ship_key, cruise_key), overwrite = TRUE)
dbExecute(con, "ALTER TABLE cufes_sample ADD COLUMN IF NOT EXISTS ship_key VARCHAR")
dbExecute(con, "ALTER TABLE cufes_sample ADD COLUMN IF NOT EXISTS cruise_key VARCHAR")
dbExecute(con, "UPDATE cufes_sample s SET ship_key=z.ship_key, cruise_key=z.cruise_key FROM zk z WHERE s.sample_id=z.sample_id")
dbExecute(con, "DROP TABLE zk")
cat(glue("ship match {sum(!is.na(zt$ship_key))}/{nrow(zt)}; cruise_key {sum(!is.na(zt$cruise_key))}/{nrow(zt)}"), "\n")
add_point_geom(con, "cufes_sample", lon_col = "longitude", lat_col = "latitude")
assign_grid_key(con, "cufes_sample") |> datatable(caption = "Grid assignment")
```
## Add Measurement Types
```{r}
#| label: meas-types
cufes_types <- tibble(
measurement_type = egg_cols,
description = c("Sardine egg count","Northern anchovy egg count","Jack mackerel egg count",
"Pacific hake egg count","Squid egg count","Other fish egg count"),
units = "count", `_source_column` = egg_cols, `_source_table` = "cufes_measurement",
`_source_datasets` = "swfsc_cufes", `_qual_column` = NA_character_, `_prec_column` = NA_character_)
new_types <- cufes_types |> filter(!measurement_type %in% d_meas_type$measurement_type)
if (nrow(new_types) > 0) { d_meas_type <- bind_rows(d_meas_type, new_types); write_csv(d_meas_type, meas_type_csv) }
dbWriteTable(con, "measurement_type", d_meas_type, overwrite = TRUE)
cat(glue("added {nrow(new_types)} measurement types"), "\n")
```
## Metadata, Schema, Validate, Outputs
```{r}
#| label: finalize
d_dataset <- ingest_yaml_to_dataset_df(read_ingest_yaml(here()))
dbWriteTable(con, "dataset", d_dataset, overwrite = TRUE)
cufes_rels <- list(
primary_keys = list(cufes_sample = "sample_id", cufes_measurement = "cufes_measurement_id",
measurement_type = "measurement_type"),
foreign_keys = list(
list(table="cufes_measurement", column="sample_id", ref_table="cufes_sample", ref_column="sample_id"),
list(table="cufes_measurement", column="measurement_type", ref_table="measurement_type", ref_column="measurement_type")))
cc_erd(con, tables = c("cufes_sample","cufes_measurement","measurement_type","dataset"), rels = cufes_rels,
colors = list(lightblue = c("cufes_sample","cufes_measurement"), lightyellow = "measurement_type", white = "dataset"))
build_relationships_json(rels = cufes_rels, output_dir = dir_parquet, provider = provider, dataset = dataset)
results <- validate_for_release(con, checks = "all", strict = FALSE)
cat("Validation:", ifelse(results$passed, "PASSED", "FAILED"), "\n")
dir_create(dir_parquet)
write_parquet_outputs(con = con, output_dir = dir_parquet,
tables = c("cufes_sample","cufes_measurement","measurement_type","dataset"),
sort_by = list(cufes_measurement = "measurement_type"), strip_provenance = FALSE)
d_tbls_rd <- read_csv(here("metadata/swfsc/cufes/tbls_redefine.csv"))
d_flds_rd <- read_csv(here("metadata/swfsc/cufes/flds_redefine.csv"))
build_metadata_json(con = con, d_tbls_rd = d_tbls_rd, d_flds_rd = d_flds_rd,
metadata_derived_csv = here("metadata/swfsc/cufes/metadata_derived.csv"),
output_dir = dir_parquet, tables = c("cufes_sample","cufes_measurement"),
set_comments = TRUE, provider = provider, dataset = dataset,
workflow_url = cc$workflow_url, tables_owned = tables_owned)
sync_to_gcs(local_dir = dir_parquet, gcs_prefix = glue("ingest/{dir_label}"), bucket = "calcofi-db")
```
## Questions for Data Providers
```{r}
#| label: provider-questions
read_csv(here(glue("metadata/{provider}/{dataset}/questions.csv"))) |>
arrange(factor(priority, c("blocker","high","normal")), id) |>
select(priority, question, context, status) |>
datatable(caption = "Questions (ranked)", options = list(dom="t"), rownames = FALSE)
```
```{r}
#| label: cleanup
close_duckdb(con); cat(glue("Parquet outputs written to: {dir_parquet}"), "\n")
```