---
title: "Release CalCOFI Database"
calcofi:
target_name: release_database
workflow_type: release
dependency:
- auto
output: data/releases
execute:
echo: true
message: true
warning: true
editor_options:
chunk_output_type: console
format:
html:
code-fold: true
editor:
markdown:
wrap: 72
---
## Overview {.unnumbered}
**Goal**: Create a frozen (immutable) release of the CalCOFI integrated
database by assembling all ingest parquet outputs. This is the "caboose"
notebook that always runs last, after all ingest notebooks complete.
**Upstream notebooks** are auto-discovered from `calcofi:` YAML
frontmatter in each `.qmd`. All workflows with `workflow_type: ingest`
or `spatial` feed into this release notebook via `dependency: [auto]`
in `_targets.R`.
```{mermaid}
%%| label: fig-workflow
%%| fig-cap: "Pipeline: ingest notebooks produce parquet → release caboose assembles frozen release"
flowchart LR
subgraph ingest["Ingest Notebooks"]
i1["ichthyo"]
i2["bottle"]
i3["ctd-cast"]
i4["dic"]
i5["inverts"]
i6["spatial"]
end
subgraph assembly["Assembly (this notebook)"]
a["Load all parquet<br/>into in-memory DuckDB"]
end
subgraph release["Frozen Release"]
f["Clean parquet<br/>+ manifest.json"]
end
i1 --> a
i2 --> a
i3 --> a
i4 --> a
i5 --> a
i6 --> a
a --> f
style ingest fill:#e3f2fd,stroke:#1565c0
style assembly fill:#fff3e0,stroke:#ef6c00
style release fill:#e8f4e8,stroke:#2e7d32
```
## Setup
```{r}
#| label: setup
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
# cleanup_gcs_obsolete(dry_run = F)
librarian::shelf(
CalCOFI / calcofi4db,
CalCOFI / calcofi4r,
DBI,
dplyr,
DT,
fs,
glue,
here,
jsonlite,
purrr,
tibble,
quiet = T
)
options(DT.options = list(scrollX = TRUE))
# release version
release_version <- format(Sys.Date(), "v%Y.%m.%d")
message(glue("Release version: {release_version}"))
```
## Assemble from Ingest Outputs
Load all per-dataset parquet outputs into an in-memory DuckDB. Each
ingest notebook writes parquet to `data/parquet/{dataset}/` and uploads
to `gs://calcofi-db/ingest/{dataset}/`.
```{r}
#| label: assemble_working
con_wdl <- get_duckdb_con(":memory:")
load_duckdb_extension(con_wdl, "spatial")
# discover upstream parquet dirs from .qmd frontmatter
wf <- parse_qmd_frontmatter(here())
parquet_dirs <- wf |>
filter(workflow_type %in% c("ingest", "spatial")) |>
pull(output) |>
dirname() |>
purrr::map_chr(~ here(.x))
message(glue("Loading {length(parquet_dirs)} ingest parquet dirs"))
load_stats <- purrr::map_dfr(parquet_dirs, function(pd) {
load_prior_tables(
con = con_wdl,
parquet_dir = pd
)
})
load_stats |>
datatable(caption = "Assembled tables from ingest parquet outputs")
```
## Scan Manifests for Mismatches
```{r}
#| label: scan_mismatches
# scan all ingest manifests for unresolved mismatches
all_manifests <- list.files(
"data/parquet", "manifest.json",
recursive = TRUE, full.names = TRUE)
all_mismatches <- purrr::compact(lapply(all_manifests, function(mf) {
m <- jsonlite::read_json(mf)
if (is.null(m$mismatches)) return(NULL)
dataset <- basename(dirname(mf))
purrr::imap_dfr(m$mismatches, function(items, category) {
if (length(items) == 0) return(NULL)
purrr::map_dfr(items, function(x) {
# replace NULL values with NA so as_tibble works
x[vapply(x, is.null, logical(1))] <- NA
as_tibble(x)
}) |>
mutate(dataset = dataset, category = category, .before = 1)
})
}))
if (length(all_mismatches) > 0) {
d_mismatches <- bind_rows(all_mismatches)
message(glue("{nrow(d_mismatches)} unresolved mismatch(es) across manifests"))
d_mismatches |>
datatable(caption = "Unresolved mismatches (from manifest.json)")
} else {
message("No unresolved mismatches found across manifests")
}
```
## Validate
Cross-dataset validation to ensure data integrity before freezing.
```{r}
#| label: validate
# grid_key integrity: casts.grid_key should all be in grid.grid_key
tbls <- DBI::dbListTables(con_wdl)
if (all(c("casts", "grid") %in% tbls)) {
# use information_schema to check columns (avoids GEOMETRY type issues)
casts_cols_wdl <- dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'casts'"
)$column_name
grid_cols_wdl <- dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'grid'"
)$column_name
if ("grid_key" %in% casts_cols_wdl && "grid_key" %in% grid_cols_wdl) {
grid_orphans <- dbGetQuery(
con_wdl,
"SELECT COUNT(*) AS n FROM casts c
WHERE c.grid_key IS NOT NULL
AND c.grid_key NOT IN (SELECT grid_key FROM grid)"
)$n
message(glue("Grid key orphans in casts: {grid_orphans}"))
# Grid key orphans in casts: 0
}
}
# ship PK uniqueness
if ("ship" %in% tbls) {
ship_dups <- dbGetQuery(
con_wdl,
"SELECT ship_key, COUNT(*) AS n FROM ship
GROUP BY ship_key HAVING COUNT(*) > 1"
)
if (nrow(ship_dups) > 0) {
warning(glue("Duplicate ship_key values: {nrow(ship_dups)}"))
} else {
message("ship_key: all unique")
}
}
# ship_key: all unique
# cruise PK uniqueness
if ("cruise" %in% tbls) {
cruise_dups <- dbGetQuery(
con_wdl,
"SELECT cruise_key, COUNT(*) AS n FROM cruise
GROUP BY cruise_key HAVING COUNT(*) > 1"
)
if (nrow(cruise_dups) > 0) {
warning(glue("Duplicate cruise_key values: {nrow(cruise_dups)}"))
} else {
message("cruise_key: all unique")
}
}
# cruise_key: all unique
# cruise bridge coverage
if ("casts" %in% tbls) {
bridge_stats <- dbGetQuery(
con_wdl,
"SELECT
COUNT(*) AS total_casts,
SUM(CASE WHEN ship_key IS NOT NULL THEN 1 ELSE 0 END) AS with_ship_key,
SUM(CASE WHEN cruise_key IS NOT NULL THEN 1 ELSE 0 END) AS with_cruise_key
FROM casts"
)
bridge_stats |> datatable(caption = "Cruise bridge coverage")
}
# cruise_key format validation (YYYY-MM-NODC)
if ("cruise" %in% tbls) {
bad_ck <- dbGetQuery(
con_wdl,
"SELECT cruise_key FROM cruise
WHERE cruise_key IS NOT NULL
AND NOT regexp_matches(cruise_key, '^\\d{4}-\\d{2}-.+$')"
)
if (nrow(bad_ck) > 0) {
warning(glue("cruise_key format violations: {nrow(bad_ck)} rows"))
} else {
message("cruise_key: all match YYYY-MM-NODC format")
}
}
# Warning message: cruise_key format violations: 1 rows
# cruise_key: 2019-07-
# site_key format validation (NNN.N NNN.N)
for (tbl_name in intersect(c("site", "casts", "ctd_cast"), tbls)) {
tbl_cols <- dbGetQuery(
con_wdl,
glue(
"SELECT column_name FROM information_schema.columns
WHERE table_name = '{tbl_name}'"
)
)$column_name
if ("site_key" %in% tbl_cols) {
bad_sk <- dbGetQuery(
con_wdl,
glue(
"SELECT COUNT(*) AS n FROM {tbl_name}
WHERE site_key IS NOT NULL
AND NOT regexp_matches(site_key, '^\\d{{3}}\\.\\d \\d{{3}}\\.\\d$')"
)
)$n
if (bad_sk > 0) {
warning(glue("site_key format violations in {tbl_name}: {bad_sk} rows"))
} else {
message(glue("site_key in {tbl_name}: all match NNN.N NNN.N format"))
}
}
}
# site_key in casts: all match NNN.N NNN.N format
# Warning message: site_key format violations in site: 982 rows
# cruise_summary table for Shiny app
if (all(c("cruise", "ship") %in% tbls)) {
# detect site_key vs sta_key for backward compat with old parquets
ctd_site_col <- ifelse(
"site_key" %in%
dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'ctd_cast'"
)$column_name,
"site_key",
"sta_key"
)
dbExecute(
con_wdl,
glue(
"
CREATE OR REPLACE TABLE cruise_summary AS
SELECT
cr.cruise_key,
EXTRACT(YEAR FROM cr.date_ym)::INTEGER AS year,
EXTRACT(MONTH FROM cr.date_ym)::INTEGER AS month,
s.ship_name,
s.ship_nodc,
COALESCE((SELECT COUNT(DISTINCT site_key) FROM site
WHERE cruise_key = cr.cruise_key), 0) AS ichthyo,
COALESCE((SELECT COUNT(DISTINCT site_key) FROM casts
WHERE cruise_key = cr.cruise_key), 0) AS bottle,
COALESCE((SELECT COUNT(DISTINCT {ctd_site_col}) FROM ctd_cast
WHERE cruise_key = cr.cruise_key), 0) AS ctd_cast,
COALESCE((SELECT COUNT(DISTINCT ds.site_key)
FROM dic_sample ds
JOIN casts c ON ds.cast_id = c.cast_id
WHERE c.cruise_key = cr.cruise_key), 0) AS dic
FROM cruise cr
JOIN ship s ON cr.ship_key = s.ship_key
ORDER BY year DESC, month DESC"
)
)
n_cs <- dbGetQuery(con_wdl, "SELECT COUNT(*) AS n FROM cruise_summary")$n
message(glue("Created cruise_summary table: {n_cs} rows"))
}
# Created cruise_summary table: 691 rows
tbl(con_wdl, "cruise_summary") |>
collect() |>
datatable(caption = "cruise_summary preview")
# run standard release validation (wrapped in tryCatch for GEOMETRY compat)
tryCatch(
{
validation <- validate_for_release(con_wdl)
if (validation$passed) {
message("Release validation passed!")
} else {
cat("Validation FAILED:\n")
cat(paste("-", validation$errors, collapse = "\n"))
}
},
error = function(e) {
message(glue("validate_for_release skipped: {e$message}"))
}
)
```
## Show Combined Schema
```{r}
# dir_frozen used later; define early so ERD can reference it
dir_frozen <- here(glue("data/releases/{release_version}"))
dir.create(dir_frozen, recursive = TRUE, showWarnings = FALSE)
erd <- cc_erd(con_wdl, layout = "elk")
plot(erd)
erd <- cc_erd(
con_wdl,
colors = list(
lightblue = c("cruise", "ship", "site", "tow", "net"),
lightyellow = c("ichthyo", "species", "lookup", "taxon", "taxa_rank"),
lightgreen = c("grid", "segment"),
pink = c(
"casts",
"bottle",
"bottle_measurement",
"cast_condition",
"measurement_type"
),
lavender = c("ctd_cast", "ctd_measurement", "ctd_summary"),
lightsalmon = c("dic_sample", "dic_measurement", "dic_measurement_summary"),
white = c("dataset")
)
)
plot(erd)
```
```{r}
#| label: combined_schema
#| fig-width: 12
#| fig-height: 10
# exclude internal tables and views
schema_tbls <- setdiff(
tbls,
c("_meta", "_sp_update", "casts_derived", "ctd_cast_derived"))
# merge per-dataset relationships.json files
rels_paths <- c(
here("data/parquet/swfsc_ichthyo/relationships.json"),
here("data/parquet/calcofi_bottle/relationships.json"),
here("data/parquet/calcofi_ctd-cast/relationships.json"),
here("data/parquet/calcofi_dic/relationships.json")
)
rels_paths <- rels_paths[file.exists(rels_paths)]
dir_frozen <- here(glue("data/releases/{release_version}"))
dir.create(dir_frozen, recursive = TRUE, showWarnings = FALSE)
rels_merged_path <- file.path(dir_frozen, "relationships.json")
if (length(rels_paths) > 0) {
merge_relationships_json(rels_paths, rels_merged_path)
# add cross-dataset FKs (bottle casts → ichthyo cruise/ship/grid)
rels_merged <- jsonlite::fromJSON(
rels_merged_path, simplifyVector = FALSE)
cross_fks <- list(
list(table = "casts", column = "cruise_key",
ref_table = "cruise", ref_column = "cruise_key"),
list(table = "casts", column = "ship_key",
ref_table = "ship", ref_column = "ship_key"),
list(table = "casts", column = "grid_key",
ref_table = "grid", ref_column = "grid_key"))
rels_merged$foreign_keys <- c(
rels_merged$foreign_keys, cross_fks)
jsonlite::write_json(
rels_merged, rels_merged_path,
auto_unbox = TRUE, pretty = TRUE, null = "null")
}
# render color-coded ERD (cc_erd handles GEOMETRY columns natively)
cc_erd(
con_wdl,
tables = schema_tbls,
rels_path = rels_merged_path,
colors = list(
lightblue = c("cruise", "ship", "site", "tow", "net"),
lightyellow = c("ichthyo", "species", "lookup",
"taxon", "taxa_rank"),
lightgreen = c("grid", "segment"),
pink = c("casts", "bottle", "bottle_measurement",
"cast_condition", "measurement_type"),
lavender = c("ctd_cast", "ctd_measurement", "ctd_summary"),
lightsalmon = c("dic_sample", "dic_measurement",
"dic_measurement_summary"),
white = "dataset"))
```
## Create Frozen Release
Strip provenance columns and export clean parquet files for public
access. See
[Frozen DuckLake](https://ducklake.select/2025/10/24/frozen-ducklake/)
pattern.
```{r}
#| label: eval_off_rest
# freeze + upload enabled so GCS release includes all tables (incl. geometry)
# knitr::opts_chunk$set(eval = FALSE)
```
```{r}
#| label: freeze_release
dir_frozen <- here(glue("data/releases/{release_version}"))
message(glue("Creating frozen release: {release_version}"))
# get tables to freeze (exclude internal and view tables)
tables_to_freeze <- schema_tbls
# create frozen parquet (strips provenance)
# partition large tables by cruise_key for efficient reads
freeze_stats <- write_parquet_outputs(
con = con_wdl,
output_dir = file.path(dir_frozen, "parquet"),
tables = tables_to_freeze,
partition_by = list(
ctd_measurement = "cruise_key",
ctd_summary = "cruise_key"
),
strip_provenance = TRUE,
compression = "zstd"
)
freeze_stats |>
datatable(caption = glue("Frozen release {release_version} statistics"))
```
## Release Notes
```{r}
#| label: release_notes
#| results: asis
# build release notes
tables_list <- paste0(
"- ",
freeze_stats$table,
" (",
format(freeze_stats$rows, big.mark = ","),
" rows)"
)
release_notes <- paste0(
"# CalCOFI Database Release ",
release_version,
"\n\n",
"**Release Date**: ",
Sys.Date(),
"\n\n",
"## Tables Included\n\n",
paste(tables_list, collapse = "\n"),
"\n\n",
"## Total\n\n",
"- **Tables**: ",
nrow(freeze_stats),
"\n",
"- **Total Rows**: ",
format(sum(freeze_stats$rows), big.mark = ","),
"\n",
"- **Total Size**: ",
round(sum(freeze_stats$file_size) / 1024 / 1024, 1),
" MB\n\n",
"## Data Sources\n\n",
"- `ingest_swfsc_ichthyo.qmd` - Ichthyo tables (cruise, ship, site, tow, net, species, ichthyo, grid, segment, lookup, taxon, taxa_rank)\n",
"- `ingest_calcofi_bottle.qmd` - Bottle/cast tables (casts, bottle, bottle_measurement, cast_condition, measurement_type)\n",
"- `ingest_calcofi_ctd-cast.qmd` - CTD tables (ctd_cast, ctd_measurement, ctd_summary, measurement_type)\n",
"- `ingest_calcofi_dic.qmd` - DIC/alkalinity tables (dic_sample, dic_measurement, dic_measurement_summary, dataset)\n\n",
"## Cross-Dataset Integration\n\n",
"- **Ship matching**: Reconciled ship codes between bottle casts and swfsc ship reference\n",
"- **Cruise bridge**: Derived cruise_key (YYYY-MM-NODC) for bottle casts via ship matching + datetime\n",
"- **Taxonomy**: Standardized species with WoRMS AphiaID, ITIS TSN, GBIF backbone key\n",
"- **Taxon hierarchy**: Built taxon + taxa_rank tables from WoRMS/ITIS classification\n\n",
"## Access\n\n",
"Parquet files can be queried directly from GCS:\n\n",
"```r\n",
"library(duckdb)\n",
"con <- dbConnect(duckdb())\n",
"dbExecute(con, 'INSTALL httpfs; LOAD httpfs;')\n",
"dbGetQuery(con, \"\n",
" SELECT * FROM read_parquet(\n",
" 'https://storage.googleapis.com/calcofi-db/ducklake/releases/",
release_version,
"/parquet/ichthyo.parquet')\n",
" LIMIT 10\")\n",
"```\n\n",
"Or use calcofi4r:\n\n",
"```r\n",
"library(calcofi4r)\n",
"con <- cc_get_db(version = '",
release_version,
"')\n",
"```\n"
)
writeLines(release_notes, file.path(dir_frozen, "RELEASE_NOTES.md"))
message(glue(
"Release notes written to {file.path(dir_frozen, 'RELEASE_NOTES.md')}"
))
cat(release_notes)
```
## Upload Frozen Release to GCS
```{r}
#| label: upload_frozen
# upload frozen release to GCS
upload_frozen_release(
release_dir = dir_frozen,
version = release_version,
set_latest = TRUE
)
```
## Cleanup
```{r}
#| label: cleanup
# close in-memory DuckDB connection
close_duckdb(con_wdl)
message("Assembly DuckDB connection closed")
# summary
message(glue("\n=== Summary ==="))
message(glue("Frozen release: {release_version} created at {dir_frozen}"))
message(glue("Tables: {nrow(freeze_stats)}"))
message(glue("Total rows: {format(sum(freeze_stats$rows), big.mark = ',')}"))
```
::: {.callout-caution collapse="true"}
## Session Info
```{r session_info}
devtools::session_info()
```
:::