---
title: "Merge Ichthyo & Bottle to Working DuckLake"
execute:
echo: true
warning: false
editor_options:
chunk_output_type: console
format:
html:
code-fold: true
editor:
markdown:
wrap: 72
---
## Overview {.unnumbered}
**Goal**: Merge parquet outputs from both upstream ingest workflows,
perform cross-dataset reconciliation (ship matching, cruise bridge,
taxonomy standardization), then load into Working DuckLake and create a
frozen release.
**Steps**:
1. Load parquet files from both upstream workflows into local temp
DuckDB
2. Fuzzy match ships between datasets
3. Build cruise bridge (casts → cruise_key)
4. Standardize taxonomy (species → WoRMS/ITIS/GBIF)
5. Validate cross-dataset integrity
6. Write modified parquet outputs + manifest with GCS references
7. Ingest all tables into Working DuckLake with provenance
8. Freeze release (strip provenance)
9. Upload to GCS
**Upstream workflows** (must complete first):
- `ingest_swfsc.noaa.gov_calcofi-db.qmd` → ichthyo tables (10)
- `ingest_calcofi.org_bottle-database.qmd` → bottle/cast tables (5)
```{mermaid}
%%| label: fig-workflow
%%| fig-cap: "Integration workflow: upstream parquet → merge → Working DuckLake → Frozen Release"
flowchart LR
subgraph upstream["Upstream Parquet"]
s["swfsc<br/>10 tables"]
b["bottle<br/>5 tables"]
end
subgraph merge["Merge DuckDB"]
m1["Ship matching"]
m2["Cruise bridge"]
m3["Taxonomy"]
end
subgraph output["Output"]
p["Modified parquet<br/>+ manifest"]
w["Working DuckLake"]
f["Frozen Release"]
end
s --> merge
b --> merge
merge --> p --> w --> f
style upstream fill:#e3f2fd,stroke:#1565c0
style merge fill:#fff3e0,stroke:#ef6c00
style output fill:#e8f4e8,stroke:#2e7d32
```
## Setup
```{r}
#| label: setup
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4R"))
librarian::shelf(
CalCOFI/calcofi4db, DBI, dm, dplyr, DT, fs, glue, here,
jsonlite, purrr, readr, stringr, tibble, tidyr,
worrms, taxize,
quiet = T)
options(readr.show_col_types = F)
options(DT.options = list(scrollX = TRUE))
# parquet input directories (from upstream workflows)
dir_pq_ichthyo <- here("data/parquet/swfsc.noaa.gov_calcofi-db")
dir_pq_bottle <- here("data/parquet/calcofi.org_bottle-database")
# release version
release_version <- format(Sys.Date(), "v%Y.%m")
# local merge database
db_path <- here("data/wrangling/merge_ichthyo_bottle.duckdb")
overwrite <- FALSE # set TRUE to rebuild from scratch (takes ~1hr for taxonomy APIs)
if (overwrite && file_exists(db_path)) file_delete(db_path)
dir_create(dirname(db_path))
con <- get_duckdb_con(db_path)
load_duckdb_extension(con, "spatial")
# check if database already has tables (from previous run)
existing_tables <- DBI::dbListTables(con)
skip_load <- length(existing_tables) > 0
if (skip_load) {
message(glue("Using existing database with {length(existing_tables)} tables"))
}
```
## Check Upstream Workflows
Verify that parquet outputs and manifests exist from both upstream
workflows.
```{r}
#| label: check_upstream
# check ichthyo manifest
ichthyo_manifest_path <- file.path(dir_pq_ichthyo, "manifest.json")
stopifnot(
"Missing ichthyo parquet outputs. Run ingest_swfsc.noaa.gov_calcofi-db.qmd first." =
file.exists(ichthyo_manifest_path))
ichthyo_manifest <- read_json(ichthyo_manifest_path)
message(glue(
"Ichthyo parquet: {ichthyo_manifest$total_rows} total rows, ",
"created {ichthyo_manifest$created_at}"))
# check bottle manifest
bottle_manifest_path <- file.path(dir_pq_bottle, "manifest.json")
stopifnot(
"Missing bottle parquet outputs. Run ingest_calcofi.org_bottle-database.qmd first." =
file.exists(bottle_manifest_path))
bottle_manifest <- read_json(bottle_manifest_path)
message(glue(
"Bottle parquet: {bottle_manifest$total_rows} total rows, ",
"created {bottle_manifest$created_at}"))
# show manifest stats
tibble(
source = c("ichthyo", "bottle"),
tables = c(
length(ichthyo_manifest$tables),
length(bottle_manifest$tables)),
rows = c(
ichthyo_manifest$total_rows,
bottle_manifest$total_rows),
created = c(
ichthyo_manifest$created_at,
bottle_manifest$created_at)) |>
datatable(caption = "Upstream workflow manifest stats")
```
## Load SWFSC Tables
Load all ichthyo parquet files into the local merge DuckDB. Tables with
geometry columns (grid, site, segment) need WKB→GEOMETRY conversion.
```{r}
#| label: load_ichthyo
ichthyo_files <- list.files(
dir_pq_ichthyo, pattern = "\\.parquet$", full.names = TRUE)
# tables with geometry columns that need WKB conversion
geom_tables <- c("grid", "site", "segment")
if (skip_load) {
message("Using existing ichthyo tables from database")
ichthyo_stats <- map_dfr(ichthyo_files, function(pqt_path) {
tbl_name <- tools::file_path_sans_ext(basename(pqt_path))
n <- DBI::dbGetQuery(con, glue("SELECT COUNT(*) AS n FROM {tbl_name}"))$n
tibble(table = tbl_name, rows = n, source = "ichthyo")
})
} else {
ichthyo_stats <- map_dfr(ichthyo_files, function(pqt_path) {
tbl_name <- tools::file_path_sans_ext(basename(pqt_path))
if (tbl_name %in% geom_tables) {
# use load_gcs_parquet_to_duckdb for WKB→GEOMETRY conversion
DBI::dbExecute(con, glue(
"CREATE OR REPLACE TABLE {tbl_name} AS
SELECT * FROM read_parquet('{pqt_path}')"))
# detect and convert WKB BLOB columns
blob_cols <- DBI::dbGetQuery(con, glue(
"SELECT column_name FROM information_schema.columns
WHERE table_name = '{tbl_name}'
AND data_type = 'BLOB'
AND column_name LIKE '%geom%'"))$column_name
for (gc in blob_cols) {
tmp_col <- paste0(gc, "_tmp")
DBI::dbExecute(con, glue(
'ALTER TABLE {tbl_name} ADD COLUMN {tmp_col} GEOMETRY'))
DBI::dbExecute(con, glue(
'UPDATE {tbl_name} SET {tmp_col} = ST_GeomFromWKB({gc})'))
DBI::dbExecute(con, glue(
'ALTER TABLE {tbl_name} DROP COLUMN {gc}'))
DBI::dbExecute(con, glue(
'ALTER TABLE {tbl_name} RENAME COLUMN {tmp_col} TO {gc}'))
}
} else {
DBI::dbExecute(con, glue(
"CREATE OR REPLACE TABLE {tbl_name} AS
SELECT * FROM read_parquet('{pqt_path}')"))
}
n <- DBI::dbGetQuery(con, glue("SELECT COUNT(*) AS n FROM {tbl_name}"))$n
tibble(table = tbl_name, rows = n, source = "ichthyo")
})
}
ichthyo_stats |> datatable(caption = "SWFSC ichthyo tables loaded")
```
## Load Bottle Tables
Load bottle parquet files (grid excluded from bottle export — canonical
from swfsc).
```{r}
#| label: load_bottle
bottle_files <- list.files(
dir_pq_bottle, pattern = "\\.parquet$", full.names = TRUE)
# belt-and-suspenders: skip grid.parquet if it still exists
bottle_files <- bottle_files[!grepl("grid\\.parquet$", bottle_files)]
if (skip_load) {
message("Using existing bottle tables from database")
bottle_stats <- map_dfr(bottle_files, function(pqt_path) {
tbl_name <- tools::file_path_sans_ext(basename(pqt_path))
n <- DBI::dbGetQuery(con, glue("SELECT COUNT(*) AS n FROM {tbl_name}"))$n
tibble(table = tbl_name, rows = n, source = "bottle")
})
} else {
bottle_stats <- map_dfr(bottle_files, function(pqt_path) {
tbl_name <- tools::file_path_sans_ext(basename(pqt_path))
DBI::dbExecute(con, glue(
"CREATE OR REPLACE TABLE {tbl_name} AS
SELECT * FROM read_parquet('{pqt_path}')"))
n <- DBI::dbGetQuery(con, glue("SELECT COUNT(*) AS n FROM {tbl_name}"))$n
tibble(table = tbl_name, rows = n, source = "bottle")
})
}
bottle_stats |> datatable(caption = "Bottle tables loaded")
```
## Show Loaded Tables
```{r}
#| label: show_tables
all_stats <- bind_rows(ichthyo_stats, bottle_stats)
message(glue(
"Loaded {nrow(all_stats)} tables: ",
"{sum(all_stats$source == 'ichthyo')} ichthyo + ",
"{sum(all_stats$source == 'bottle')} bottle"))
all_stats |>
arrange(source, table) |>
datatable(caption = "All loaded tables")
```
## Fuzzy Ship Matching
Reconcile ship codes between bottle casts and swfsc ship reference
table. Uses `calcofi4db::match_ships()`.
```{r}
#| label: ship_matching
# get unmatched ships from casts
unmatched <- dbGetQuery(con, "
SELECT DISTINCT c.ship_code, c.ship_name
FROM casts c
LEFT JOIN ship s ON c.ship_code = s.ship_nodc
WHERE s.ship_key IS NULL")
message(glue("{nrow(unmatched)} unmatched ship codes in casts"))
# ship renames CSV (manual overrides from previous reconciliation)
ship_renames_csv <- here(
"metadata/calcofi.org/bottle-database/ship_renames.csv")
# run fuzzy matching
ship_matches <- match_ships(
unmatched_ships = unmatched,
reference_ships = dbReadTable(con, "ship"),
ship_renames_csv = ship_renames_csv,
fetch_ices = TRUE)
ship_matches |>
datatable(caption = "Ship matching results")
```
### Match Statistics
```{r}
#| label: ship_match_stats
ship_matches |>
count(match_type) |>
datatable(caption = "Ship match type counts")
```
## Build Cruise Bridge
Link bottle casts to swfsc cruise table via ship_key → cruise_key.
### Step 1: Add ship_key to casts
```{r}
#| label: cruise_bridge_ship_key
# add ship_key column
dbExecute(con, "ALTER TABLE casts ADD COLUMN IF NOT EXISTS ship_key TEXT")
# exact match: casts.ship_code = ship.ship_nodc
dbExecute(con, "
UPDATE casts SET ship_key = (
SELECT s.ship_key FROM ship s
WHERE s.ship_nodc = casts.ship_code
LIMIT 1)")
n_exact <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM casts WHERE ship_key IS NOT NULL")$n
message(glue("Exact ship_nodc match: {n_exact} casts"))
# apply fuzzy match results for previously unmatched ships
matched_ships <- ship_matches |>
filter(match_type != "unmatched", !is.na(matched_ship_key))
if (nrow(matched_ships) > 0) {
for (i in seq_len(nrow(matched_ships))) {
m <- matched_ships[i, ]
dbExecute(con, glue("
UPDATE casts SET ship_key = '{m$matched_ship_key}'
WHERE ship_code = '{m$ship_code}'
AND ship_key IS NULL"))
}
n_fuzzy <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM casts WHERE ship_key IS NOT NULL")$n - n_exact
message(glue("Fuzzy/manual match: {n_fuzzy} additional casts"))
}
# summary
ship_key_stats <- dbGetQuery(con, "
SELECT
CASE WHEN ship_key IS NULL THEN 'no_ship_match'
ELSE 'matched' END AS status,
COUNT(*) AS n_casts
FROM casts
GROUP BY status")
ship_key_stats |> datatable(caption = "ship_key assignment stats")
```
### Step 2: Derive cruise_key
Cruise key format: YYMMKK (2-digit year, 2-digit month, 2-letter ship
key).
```{r}
#| label: cruise_bridge_cruise_key
# add cruise_key column
dbExecute(con, "ALTER TABLE casts ADD COLUMN IF NOT EXISTS cruise_key TEXT")
# derive YYMMKK from datetime_utc + ship_key
dbExecute(con, "
UPDATE casts SET cruise_key = CONCAT(
LPAD(CAST(EXTRACT(YEAR FROM datetime_utc) % 100 AS VARCHAR), 2, '0'),
LPAD(CAST(EXTRACT(MONTH FROM datetime_utc) AS VARCHAR), 2, '0'),
ship_key)
WHERE ship_key IS NOT NULL")
n_cruise <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM casts WHERE cruise_key IS NOT NULL")$n
message(glue("Derived cruise_key for {n_cruise} casts"))
```
### Step 3: Validate against swfsc cruise table
```{r}
#| label: cruise_bridge_validate
cruise_stats <- dbGetQuery(con, "
SELECT
CASE
WHEN c.ship_key IS NULL THEN 'no_ship_match'
WHEN c.cruise_key IS NULL THEN 'no_cruise_key'
WHEN cr.cruise_key IS NULL THEN 'no_cruise_match'
ELSE 'matched'
END AS status,
COUNT(*) AS n_casts
FROM casts c
LEFT JOIN cruise cr ON c.cruise_key = cr.cruise_key
GROUP BY status
ORDER BY status")
cruise_stats |> datatable(caption = "Cruise bridge match statistics")
```
### Step 4: Report unmatched ship codes
```{r}
#| label: unmatched_ships
unmatched_report <- dbGetQuery(con, "
SELECT DISTINCT
c.ship_code, c.ship_name,
COUNT(*) AS n_casts,
MIN(c.datetime_utc) AS first_cast,
MAX(c.datetime_utc) AS last_cast
FROM casts c
WHERE c.ship_key IS NULL
GROUP BY c.ship_code, c.ship_name
ORDER BY n_casts DESC")
if (nrow(unmatched_report) > 0) {
unmatched_report |>
datatable(caption = "Unmatched ship codes (no ship_key)")
} else {
message("All ship codes matched!")
}
```
## Standardize Taxonomy
Update species table with WoRMS/ITIS/GBIF identifiers using
`standardize_species()` and build taxonomy hierarchy using
`build_taxon_table()` from `calcofi4db/R/taxonomy.R`.
```{r}
#| label: standardize_species
# check if species already standardized (all taxonomy columns exist with data)
sp_cols <- DBI::dbGetQuery(con,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'species'")$column_name
sp_standardized <- all(c("worms_id", "itis_id", "gbif_id") %in% sp_cols) &&
DBI::dbGetQuery(con,
"SELECT COUNT(*) AS n FROM species WHERE worms_id IS NOT NULL")$n > 0
if (sp_standardized && !overwrite) {
message("Species already standardized, skipping API calls")
sp_results <- DBI::dbGetQuery(con,
"SELECT species_id, scientific_name, worms_id, itis_id, gbif_id
FROM species")
} else {
# standardize species identifiers (slow - makes API calls)
sp_results <- standardize_species(
con = con,
species_tbl = "species",
include_gbif = TRUE)
}
sp_results |>
datatable(caption = "Species standardization results")
```
```{r}
#| label: build_taxon
# check if taxon table already exists with data
taxon_exists <- "taxon" %in% existing_tables &&
DBI::dbGetQuery(con, "SELECT COUNT(*) AS n FROM taxon")$n > 0
if (taxon_exists && !overwrite) {
message("Taxon table already exists, skipping API calls")
taxon_rows <- DBI::dbReadTable(con, "taxon")
} else {
# build taxon hierarchy table (slow - makes API calls)
taxon_rows <- build_taxon_table(
con = con,
species_tbl = "species",
taxon_tbl = "taxon",
include_itis = TRUE)
}
# show taxon stats
if (nrow(taxon_rows) > 0) {
taxon_rows |>
count(authority, taxonRank) |>
arrange(authority, taxonRank) |>
datatable(caption = "Taxon hierarchy by authority and rank")
}
# show taxa_rank lookup
dbReadTable(con, "taxa_rank") |>
datatable(caption = "Taxa rank ordering")
```
### Taxonomy Statistics
```{r}
#| label: taxonomy_stats
n_species <- dbGetQuery(con, "SELECT COUNT(*) AS n FROM species")$n
n_worms <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM species WHERE worms_id IS NOT NULL")$n
n_itis <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM species WHERE itis_id IS NOT NULL")$n
n_gbif <- dbGetQuery(con,
"SELECT COUNT(*) AS n FROM species WHERE gbif_id IS NOT NULL")$n
n_taxon <- dbGetQuery(con, "SELECT COUNT(*) AS n FROM taxon")$n
tibble(
metric = c("total species", "with worms_id", "with itis_id",
"with gbif_id", "taxon hierarchy rows"),
count = c(n_species, n_worms, n_itis, n_gbif, n_taxon)) |>
datatable(caption = "Taxonomy standardization summary")
```
## Validate Cross-Dataset Integrity
```{r}
#| label: validate
# grid_key integrity: casts.grid_key should all be in grid.grid_key
if ("grid_key" %in% dbListFields(con, "casts") &&
"grid_key" %in% dbListFields(con, "grid")) {
grid_orphans <- dbGetQuery(con, "
SELECT COUNT(*) AS n FROM casts c
WHERE c.grid_key IS NOT NULL
AND c.grid_key NOT IN (SELECT grid_key FROM grid)")$n
message(glue("Grid key orphans in casts: {grid_orphans}"))
}
# ship PK uniqueness
ship_dups <- dbGetQuery(con, "
SELECT ship_key, COUNT(*) AS n FROM ship
GROUP BY ship_key HAVING COUNT(*) > 1")
if (nrow(ship_dups) > 0)
warning(glue("Duplicate ship_key values: {nrow(ship_dups)}"))
# cruise PK uniqueness
cruise_dups <- dbGetQuery(con, "
SELECT cruise_key, COUNT(*) AS n FROM cruise
GROUP BY cruise_key HAVING COUNT(*) > 1")
if (nrow(cruise_dups) > 0)
warning(glue("Duplicate cruise_key values: {nrow(cruise_dups)}"))
# cruise bridge match statistics
bridge_stats <- dbGetQuery(con, "
SELECT
COUNT(*) AS total_casts,
SUM(CASE WHEN ship_key IS NOT NULL THEN 1 ELSE 0 END) AS with_ship_key,
SUM(CASE WHEN cruise_key IS NOT NULL THEN 1 ELSE 0 END) AS with_cruise_key
FROM casts")
bridge_stats |> datatable(caption = "Cruise bridge coverage")
message("Cross-dataset validation complete")
```
## Show Combined Schema
```{r}
#| label: combined_schema
#| fig-width: 12
#| fig-height: 10
# build dm from connection (exclude internal tables)
tbls <- setdiff(dbListTables(con), c("_sp_update"))
d <- dm::dm_from_con(con, table_names = tbls, learn_keys = FALSE)
# set table colors by group
d <- d |>
dm::dm_set_colors(
lightblue = c(cruise, site, tow, net), # ichthyo chain
lightyellow = c(ichthyo, species, lookup), # species/lookup
lightgreen = c(grid, segment), # spatial
pink = c(casts, bottle, bottle_measurement,
cast_condition, measurement_type)) # bottle chain
# add colors for new taxonomy tables if they exist
if ("taxon" %in% tbls)
d <- d |> dm::dm_set_colors(lightyellow = taxon)
if ("taxa_rank" %in% tbls)
d <- d |> dm::dm_set_colors(lightyellow = taxa_rank)
if ("ship" %in% tbls)
d <- d |> dm::dm_set_colors(lightblue = ship)
dm::dm_draw(d, rankdir = "LR", view_type = "all")
```
## Write Modified Parquet Outputs
Only export tables that differ from upstream (modified or new). Unchanged
tables reference upstream GCS URIs in the manifest.
```{r}
#| label: write_parquet
# tables modified by this workflow
modified_tables <- c(
"casts", # added ship_key, cruise_key columns
"species", # updated worms_id, itis_id, gbif_id
"taxon", # new table (taxonomy hierarchy)
"taxa_rank") # new table (rank ordering)
dir_pq_merged <- here("data/parquet/merge_ichthyo_bottle")
merge_stats <- write_parquet_outputs(
con = con,
output_dir = dir_pq_merged,
tables = modified_tables,
strip_provenance = FALSE)
merge_stats |>
mutate(file = basename(path)) |>
select(-path) |>
datatable(caption = "Modified parquet export statistics")
```
### Build Combined Manifest
Reference GCS URIs for unchanged upstream tables + local parquet for
modified tables.
```{r}
#| label: build_manifest
# read upstream manifests
ichthyo_manifest <- read_json(ichthyo_manifest_path)
bottle_manifest <- read_json(bottle_manifest_path)
# build combined manifest
manifest <- list(
created_at = as.character(Sys.time()),
release_version = release_version,
modified_tables = merge_stats |>
select(table, rows, file_size) |>
as.list(),
upstream_refs = list(
ichthyo = list(
manifest = ichthyo_manifest$created_at,
tables = setdiff(
ichthyo_manifest$tables,
modified_tables)),
bottle = list(
manifest = bottle_manifest$created_at,
tables = setdiff(
bottle_manifest$tables,
c(modified_tables, "grid"))))) # grid canonical from swfsc
write_json(
manifest,
file.path(dir_pq_merged, "manifest.json"),
auto_unbox = TRUE, pretty = TRUE)
message(glue(
"Manifest written: {length(modified_tables)} modified tables, ",
"upstream refs for {length(manifest$upstream_refs$ichthyo$tables)} ",
"ichthyo + {length(manifest$upstream_refs$bottle$tables)} bottle"))
```
## Ingest to Working DuckLake
Load ALL tables into Working DuckLake with provenance. Unchanged tables
read directly from upstream parquet. Modified tables read from merge
output dir.
```{r}
#| label: ingest_working
con_wdl <- get_working_ducklake()
load_duckdb_extension(con_wdl, "spatial")
# helper to ingest a parquet file with provenance
ingest_pqt <- function(pqt_path, tbl_name, source_label) {
data <- arrow::read_parquet(pqt_path)
ingest_to_working(
con = con_wdl,
data = data,
table = tbl_name,
source_file = source_label,
mode = "replace")
}
# ingest ichthyo upstream tables (skip modified ones)
ichthyo_upstream <- setdiff(ichthyo_manifest$tables, modified_tables)
ichthyo_wdl <- map_dfr(ichthyo_upstream, function(tbl) {
pqt <- file.path(dir_pq_ichthyo, paste0(tbl, ".parquet"))
if (file.exists(pqt)) {
ingest_pqt(pqt, tbl, glue("parquet/swfsc.noaa.gov_calcofi-db/{tbl}.parquet"))
}
})
# ingest bottle upstream tables (skip modified + grid)
bottle_upstream <- setdiff(
bottle_manifest$tables, c(modified_tables, "grid"))
bottle_wdl <- map_dfr(bottle_upstream, function(tbl) {
pqt <- file.path(dir_pq_bottle, paste0(tbl, ".parquet"))
if (file.exists(pqt)) {
ingest_pqt(pqt, tbl, glue("parquet/calcofi.org_bottle-database/{tbl}.parquet"))
}
})
# ingest modified tables from merge output
merge_wdl <- map_dfr(modified_tables, function(tbl) {
pqt <- file.path(dir_pq_merged, paste0(tbl, ".parquet"))
if (file.exists(pqt)) {
ingest_pqt(pqt, tbl, glue("parquet/merge_ichthyo_bottle/{tbl}.parquet"))
}
})
# combine stats
wdl_stats <- bind_rows(
ichthyo_wdl |> mutate(source = "ichthyo_upstream"),
bottle_wdl |> mutate(source = "bottle_upstream"),
merge_wdl |> mutate(source = "merge_modified"))
wdl_stats |> datatable(caption = "Working DuckLake ingestion stats")
```
## List Working Tables
```{r}
#| label: list_working
working_tables <- list_working_tables(con_wdl)
working_tables |>
datatable(caption = "Working DuckLake tables with provenance")
```
## Save Working DuckLake
```{r}
#| label: save_working
save_working_ducklake(con_wdl)
message("Working DuckLake saved to GCS")
```
## Create Frozen Release
Create a frozen (immutable) release for public access. Strips provenance
columns and exports clean parquet files. See
[Frozen DuckLake](https://ducklake.select/2025/10/24/frozen-ducklake/)
pattern.
```{r}
#| label: freeze_release
dir_frozen <- here(glue("data/releases/{release_version}"))
message(glue("Creating frozen release: {release_version}"))
# get tables to freeze (exclude internal tables)
tables_to_freeze <- DBI::dbListTables(con_wdl) |>
setdiff(c("_meta"))
# create frozen parquet (strips provenance)
freeze_stats <- write_parquet_outputs(
con = con_wdl,
output_dir = file.path(dir_frozen, "parquet"),
tables = tables_to_freeze,
strip_provenance = TRUE,
compression = "zstd")
freeze_stats |>
datatable(caption = glue("Frozen release {release_version} statistics"))
```
### Release Notes
```{r}
#| label: release_notes
#| results: asis
# build release notes using paste() to avoid glue() parsing issues with code blocks
tables_list <- paste0(
"- ", freeze_stats$table, " (",
format(freeze_stats$rows, big.mark = ","), " rows)")
release_notes <- paste0(
"# CalCOFI Database Release ", release_version, "\n\n",
"**Release Date**: ", Sys.Date(), "\n\n",
"## Tables Included\n\n",
paste(tables_list, collapse = "\n"), "\n\n",
"## Total\n\n",
"- **Tables**: ", nrow(freeze_stats), "\n",
"- **Total Rows**: ", format(sum(freeze_stats$rows), big.mark = ","), "\n",
"- **Total Size**: ", round(sum(freeze_stats$file_size) / 1024 / 1024, 1), " MB\n\n",
"## Data Sources\n\n",
"- `ingest_swfsc.noaa.gov_calcofi-db.qmd` - Ichthyo tables (cruise, ship, site, tow, net, species, ichthyo, grid, segment, lookup)\n",
"- `ingest_calcofi.org_bottle-database.qmd` - Bottle/cast tables (casts, bottle, bottle_measurement, cast_condition, measurement_type)\n\n",
"## Cross-Dataset Integration\n\n",
"- **Ship matching**: Reconciled ship codes between bottle casts and swfsc ship reference\n",
"- **Cruise bridge**: Derived cruise_key (YYMMKK) for bottle casts via ship_key + datetime\n",
"- **Taxonomy**: Standardized species with WoRMS AphiaID, ITIS TSN, GBIF backbone key\n",
"- **Taxon hierarchy**: Built taxon + taxa_rank tables from WoRMS/ITIS classification\n\n",
"## Access\n\n",
"Parquet files can be queried directly from GCS:\n\n",
"```r\n",
"library(duckdb)\n",
"con <- dbConnect(duckdb())\n",
"dbExecute(con, 'INSTALL httpfs; LOAD httpfs;')\n",
"dbGetQuery(con, \"\n",
" SELECT * FROM read_parquet(\n",
" 'https://storage.googleapis.com/calcofi-db/ducklake/releases/", release_version, "/parquet/ichthyo.parquet')\n",
" LIMIT 10\")\n",
"```\n\n",
"Or use calcofi4r:\n\n",
"```r\n",
"library(calcofi4r)\n",
"con <- cc_get_db(version = '", release_version, "')\n",
"```\n")
writeLines(release_notes, file.path(dir_frozen, "RELEASE_NOTES.md"))
message(glue("Release notes written to {file.path(dir_frozen, 'RELEASE_NOTES.md')}"))
cat(release_notes)
```
### Upload Frozen Release to GCS
```{r}
#| label: upload_frozen
#| eval: false
gcs_release_path <- glue("gs://calcofi-db/ducklake/releases/{release_version}")
message(glue("Uploading frozen release to {gcs_release_path}..."))
# upload all parquet files
parquet_files <- list.files(
file.path(dir_frozen, "parquet"),
pattern = "\\.parquet$",
full.names = TRUE)
for (pqt_file in parquet_files) {
gcs_path <- glue("{gcs_release_path}/parquet/{basename(pqt_file)}")
put_gcs_file(pqt_file, gcs_path)
}
# upload manifest and release notes
put_gcs_file(
file.path(dir_frozen, "parquet", "manifest.json"),
glue("{gcs_release_path}/manifest.json"))
put_gcs_file(
file.path(dir_frozen, "RELEASE_NOTES.md"),
glue("{gcs_release_path}/RELEASE_NOTES.md"))
# update latest symlink
put_gcs_file(
file.path(dir_frozen, "parquet", "manifest.json"),
"gs://calcofi-db/ducklake/releases/latest/manifest.json")
message(glue("Frozen release {release_version} uploaded to GCS"))
```
## Cleanup
```{r}
#| label: cleanup
# close working ducklake connection
close_duckdb(con_wdl)
message("Working DuckLake connection closed")
# close local merge database
close_duckdb(con)
message("Local merge database connection closed")
# summary
message(glue("\n=== Summary ==="))
message(glue("Working DuckLake: saved to GCS"))
message(glue("Frozen release: {release_version} created at {dir_frozen}"))
message(glue("Tables: {nrow(freeze_stats)}"))
message(glue("Total rows: {format(sum(freeze_stats$rows), big.mark = ',')}"))
```
::: {.callout-caution collapse="true"}
## Session Info
```{r session_info}
devtools::session_info()
```
:::