---
title: "Release CalCOFI Database"
calcofi:
target_name: release_database
workflow_type: release
dependency:
- auto
output: data/releases
# cross-dataset foreign keys (relationships spanning ingests) are authored in
# metadata/relationships_cross.csv; intra-dataset FKs live in each ingest's
# relationships.json. both are merged below into the release relationships.json.
# ERD color overrides for common/cross-cutting tables (neutral). dataset
# colors themselves come from each ingest's calcofi.erd.color.
erd_overrides:
dataset: "#e8e8e8"
measurement_type: "#e8e8e8"
cruise_summary: "#e8e8e8"
_spatial: "#cfe8ea"
_spatial_attr: "#cfe8ea"
execute:
echo: true
message: true
warning: true
editor_options:
chunk_output_type: console
format:
html:
code-fold: true
editor:
markdown:
wrap: 72
---
## Overview {.unnumbered}
**Goal**: Create a frozen (immutable) release of the CalCOFI integrated
database by assembling all ingest parquet outputs. This is the "caboose"
notebook that always runs last, after all ingest notebooks complete.
**Upstream notebooks** are auto-discovered from `calcofi:` YAML
frontmatter in each `.qmd`. All workflows with `workflow_type: ingest`
or `spatial` feed into this release notebook via `dependency: [auto]`
in `_targets.R`.
```{r}
#| label: gen_fig_workflow
#| results: asis
#| echo: false
#| message: false
#| warning: false
# echo must stay false: this chunk emits a ```{mermaid} cell, and echoing R
# source that contains literal ``` fences would corrupt the document structure.
librarian::shelf(targets, quiet = TRUE)
# auto-discover the pipeline graph straight from _targets.R, so newly added
# workflows show up without editing this diagram. callr_function = NULL keeps it
# in-process (safe when this notebook is rendered via tar_make); outdated = FALSE
# skips the up-to-date check (no node status needed here).
invisible(suppressMessages(capture.output(
net <- tar_network(targets_only = TRUE, outdated = FALSE, callr_function = NULL))))
nodes <- net$vertices$name
edges <- net$edges
# group each node by name so the diagram colors input / ingest / release / test
node_grp <- ifelse(nodes == "release_database", "rel",
ifelse(nodes == "test_release", "test",
ifelse(nodes == "corrections_csv", "input", "ingest")))
# emit Mermaid source; rendered to a zoomable PNG via the project-level
# `mermaid-format: png` + `lightbox: true` (see _quarto.yml). release_database
# (this notebook) is highlighted.
mmd <- c(
"flowchart LR",
vapply(nodes, function(n) sprintf(' %s["%s"]', n, n), ""),
apply(edges, 1, function(r) sprintf(" %s --> %s", r[["from"]], r[["to"]])),
" classDef input fill:#eeeeee,stroke:#999999,color:#333333;",
" classDef ingest fill:#e3f2fd,stroke:#1565c0,color:#0d3c61;",
" classDef rel fill:#ef6c00,stroke:#b35100,color:#ffffff,font-weight:bold;",
" classDef test fill:#e8f4e8,stroke:#2e7d32,color:#1b5e20;")
for (grp in c("input", "ingest", "rel", "test")) {
members <- nodes[node_grp == grp]
if (length(members))
mmd <- c(mmd, sprintf(" class %s %s;", paste(members, collapse = ","), grp))
}
cap <- paste(
"Pipeline dependency graph, auto-discovered from `_targets.R`: every workflow",
"in this folder is a node and edges are dependencies. `release_database` (this",
"notebook, orange) is the caboose — it runs last, after all ingests, to assemble",
"the frozen release. Click to zoom.")
cat("```{mermaid}\n")
cat("%%| label: fig-workflow\n")
cat('%%| fig-cap: "', cap, '"\n', sep = "")
cat(mmd, sep = "\n")
cat("\n```\n")
```
## Setup
```{r}
#| label: setup
devtools::load_all(here::here("../calcofi4db"))
devtools::load_all(here::here("../calcofi4r"))
# cleanup_gcs_obsolete(dry_run = F)
librarian::shelf(
CalCOFI / calcofi4db,
CalCOFI / calcofi4r,
DBI,
dplyr,
DT,
fs,
glue,
here,
jsonlite,
purrr,
tibble,
quiet = T
)
options(DT.options = list(scrollX = TRUE))
# release version
release_version <- format(Sys.Date(), "v%Y.%m.%d")
message(glue("Release version: {release_version}"))
```
## Assemble from Ingest Outputs
Create VIEWs on local parquet files from each ingest (zero-copy).
For tables appearing in multiple ingests, use the canonical (first) source.
```{r}
#| label: assemble_working
con_wdl <- get_duckdb_con(":memory:")
load_duckdb_extension(con_wdl, "spatial")
# auto-discover table registry from all ingest manifests
registry <- build_release_table_registry(here())
# use only canonical, non-supplemental tables
reg_canon <- registry |>
filter(canonical, !supplemental)
message(glue(
"{nrow(reg_canon)} canonical tables from ",
"{length(unique(reg_canon$ingest))} ingests"))
# --- authoritative dataset metadata + ERD coloring from ingest YAML ----
# table -> provider_dataset(s) owned, from each ingest's calcofi.tables_owned
ingest_yaml <- read_ingest_yaml(here())
table_dataset <- list()
add_owner <- function(tbl, pd) {
if (is.null(tbl)) return(invisible())
table_dataset[[tbl]] <<- unique(c(table_dataset[[tbl]], pd))
}
for (key in names(ingest_yaml)) {
cc <- ingest_yaml[[key]]
for (e in cc$tables_owned %||% list()) add_owner(e$table, key)
for (ad in cc$additional_datasets %||% list()) {
pd2 <- paste0(ad$provider, "_", ad$dataset)
for (e in ad$tables_owned %||% list()) add_owner(e$table, pd2)
}
}
# one color per dataset (from calcofi.erd.color)
dataset_colors <- lapply(ingest_yaml, function(cc) cc$erd$color)
# release-level config: neutral ERD overrides for common tables
rel_cfg <- read_calcofi_meta(here("release_database.qmd"))
release_overrides <- rel_cfg$erd_overrides
# cross-dataset foreign keys (relationships spanning ingests) are authored in a
# reviewable CSV; intra-dataset FKs live in each ingest's relationships.json.
cross_fks_df <- readr::read_csv(
here("metadata/relationships_cross.csv"), show_col_types = FALSE)
cross_fks <- lapply(seq_len(nrow(cross_fks_df)), function(i)
as.list(cross_fks_df[i, c("table", "column", "ref_table", "ref_column")]))
# stroke-based color map consumed by every cc_erd() call below
color_map <- cc_erd_color_map(
table_dataset = table_dataset,
dataset_colors = dataset_colors,
overrides = release_overrides,
neutral = "#dcdcdc")
# create VIEWs on local parquet for each canonical table
# _new delta tables handled separately for merging
all_geom_tables <- c("grid", "site", "segment", "casts", "ctd_cast", "_spatial")
main_tables <- reg_canon |> filter(!grepl("_new$", table))
new_tables <- registry |> filter(grepl("_new$", table))
load_stats <- purrr::map_dfr(
split(main_tables, seq_len(nrow(main_tables))),
function(row) {
load_prior_tables(
con = con_wdl,
parquet_dir = row$parquet_dir,
tables = row$table,
geom_tables = all_geom_tables,
as_view = TRUE
)
})
# merge {table}_new additions into their base tables
# driven by calcofi.modifies in YAML frontmatter
if (nrow(new_tables) > 0) {
# group _new tables by their base table
base_names <- unique(sub("_new$", "", new_tables$table))
for (base_tbl in base_names) {
delta_rows <- new_tables |> filter(table == paste0(base_tbl, "_new"))
# replace VIEW with TABLE for this base table (so we can INSERT)
base_src <- main_tables |> filter(table == base_tbl)
if (nrow(base_src) > 0) {
dbExecute(con_wdl, glue("DROP VIEW IF EXISTS {base_tbl}"))
load_prior_tables(
con = con_wdl, parquet_dir = base_src$parquet_dir[1],
tables = base_tbl, geom_tables = all_geom_tables)
# get PK column for dedup
pk_col <- dbGetQuery(con_wdl, glue(
"SELECT column_name FROM information_schema.columns
WHERE table_name = '{base_tbl}'
ORDER BY ordinal_position LIMIT 1"))$column_name
for (j in seq_len(nrow(delta_rows))) {
dr <- delta_rows[j, ]
pq_path <- file.path(dr$parquet_dir, paste0(base_tbl, "_new.parquet"))
if (file.exists(pq_path)) {
dbExecute(con_wdl, glue(
"INSERT INTO {base_tbl}
SELECT * FROM read_parquet('{pq_path}')
WHERE {pk_col} NOT IN (SELECT {pk_col} FROM {base_tbl})"))
n_new <- dbGetQuery(con_wdl, glue(
"SELECT COUNT(*) AS n FROM read_parquet('{pq_path}')"))$n
message(glue("Merged {n_new} {base_tbl} addition(s) from {dr$ingest}"))
}
}
}
}
}
load_stats |>
datatable(caption = "Assembled tables (VIEWs on local parquet)")
```
## Scan Manifests for Mismatches
```{r}
#| label: scan_mismatches
# scan all ingest manifests for unresolved mismatches
all_manifests <- list.files(
"data/parquet", "manifest.json",
recursive = TRUE, full.names = TRUE)
all_mismatches <- purrr::compact(lapply(all_manifests, function(mf) {
m <- jsonlite::read_json(mf)
if (is.null(m$mismatches)) return(NULL)
dataset <- basename(dirname(mf))
purrr::imap_dfr(m$mismatches, function(items, category) {
if (length(items) == 0) return(NULL)
purrr::map_dfr(items, function(x) {
# replace NULL values with NA so as_tibble works
x[vapply(x, is.null, logical(1))] <- NA
as_tibble(x)
}) |>
mutate(dataset = dataset, category = category, .before = 1)
})
}))
if (length(all_mismatches) > 0) {
d_mismatches <- bind_rows(all_mismatches)
message(glue("{nrow(d_mismatches)} unresolved mismatch(es) across manifests"))
d_mismatches |>
datatable(caption = "Unresolved mismatches (from manifest.json)")
} else {
message("No unresolved mismatches found across manifests")
}
```
## Validate
Cross-dataset validation to ensure data integrity before freezing.
```{r}
#| label: validate
# grid_key integrity: casts.grid_key should all be in grid.grid_key
tbls <- DBI::dbListTables(con_wdl)
if (all(c("casts", "grid") %in% tbls)) {
# use information_schema to check columns (avoids GEOMETRY type issues)
casts_cols_wdl <- dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'casts'"
)$column_name
grid_cols_wdl <- dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'grid'"
)$column_name
if ("grid_key" %in% casts_cols_wdl && "grid_key" %in% grid_cols_wdl) {
grid_orphans <- dbGetQuery(
con_wdl,
"SELECT COUNT(*) AS n FROM casts c
WHERE c.grid_key IS NOT NULL
AND c.grid_key NOT IN (SELECT grid_key FROM grid)"
)$n
message(glue("Grid key orphans in casts: {grid_orphans}"))
# Grid key orphans in casts: 0
}
}
# ship PK uniqueness
if ("ship" %in% tbls) {
ship_dups <- dbGetQuery(
con_wdl,
"SELECT ship_key, COUNT(*) AS n FROM ship
GROUP BY ship_key HAVING COUNT(*) > 1"
)
if (nrow(ship_dups) > 0) {
warning(glue("Duplicate ship_key values: {nrow(ship_dups)}"))
} else {
message("ship_key: all unique")
}
}
# ship_key: all unique
# cruise PK uniqueness
if ("cruise" %in% tbls) {
cruise_dups <- dbGetQuery(
con_wdl,
"SELECT cruise_key, COUNT(*) AS n FROM cruise
GROUP BY cruise_key HAVING COUNT(*) > 1"
)
if (nrow(cruise_dups) > 0) {
warning(glue("Duplicate cruise_key values: {nrow(cruise_dups)}"))
} else {
message("cruise_key: all unique")
}
}
# cruise_key: all unique
# cruise bridge coverage
if ("casts" %in% tbls) {
bridge_stats <- dbGetQuery(
con_wdl,
"SELECT
COUNT(*) AS total_casts,
SUM(CASE WHEN ship_key IS NOT NULL THEN 1 ELSE 0 END) AS with_ship_key,
SUM(CASE WHEN cruise_key IS NOT NULL THEN 1 ELSE 0 END) AS with_cruise_key
FROM casts"
)
bridge_stats |> datatable(caption = "Cruise bridge coverage")
}
# cruise_key format validation (YYYY-MM-NODC)
if ("cruise" %in% tbls) {
bad_ck <- dbGetQuery(
con_wdl,
"SELECT cruise_key FROM cruise
WHERE cruise_key IS NOT NULL
AND NOT regexp_matches(cruise_key, '^\\d{4}-\\d{2}-.+$')"
)
if (nrow(bad_ck) > 0) {
warning(glue("cruise_key format violations: {nrow(bad_ck)} rows"))
} else {
message("cruise_key: all match YYYY-MM-NODC format")
}
}
# Warning message: cruise_key format violations: 1 rows
# cruise_key: 2019-07-
# site_key format validation (NNN.N NNN.N)
for (tbl_name in intersect(c("site", "casts", "ctd_cast"), tbls)) {
tbl_cols <- dbGetQuery(
con_wdl,
glue(
"SELECT column_name FROM information_schema.columns
WHERE table_name = '{tbl_name}'"
)
)$column_name
if ("site_key" %in% tbl_cols) {
bad_sk <- dbGetQuery(
con_wdl,
glue(
"SELECT COUNT(*) AS n FROM {tbl_name}
WHERE site_key IS NOT NULL
AND NOT regexp_matches(site_key, '^\\d{{3}}\\.\\d \\d{{3}}\\.\\d$')"
)
)$n
if (bad_sk > 0) {
warning(glue("site_key format violations in {tbl_name}: {bad_sk} rows"))
} else {
message(glue("site_key in {tbl_name}: all match NNN.N NNN.N format"))
}
}
}
# site_key in casts: all match NNN.N NNN.N format
# Warning message: site_key format violations in site: 982 rows
# cruise_summary table for Shiny app
if (all(c("cruise", "ship") %in% tbls)) {
# detect site_key vs sta_key for backward compat with old parquets
ctd_site_col <- ifelse(
"site_key" %in%
dbGetQuery(
con_wdl,
"SELECT column_name FROM information_schema.columns
WHERE table_name = 'ctd_cast'"
)$column_name,
"site_key",
"sta_key"
)
dbExecute(
con_wdl,
glue(
"
CREATE OR REPLACE TABLE cruise_summary AS
SELECT
cr.cruise_key,
EXTRACT(YEAR FROM cr.date_ym)::INTEGER AS year,
EXTRACT(MONTH FROM cr.date_ym)::INTEGER AS month,
s.ship_name,
s.ship_nodc,
COALESCE((SELECT COUNT(DISTINCT site_key) FROM site
WHERE cruise_key = cr.cruise_key), 0) AS ichthyo,
COALESCE((SELECT COUNT(DISTINCT site_key) FROM casts
WHERE cruise_key = cr.cruise_key), 0) AS bottle,
COALESCE((SELECT COUNT(DISTINCT {ctd_site_col}) FROM ctd_cast
WHERE cruise_key = cr.cruise_key), 0) AS ctd_cast,
COALESCE((SELECT COUNT(DISTINCT ds.site_key)
FROM dic_sample ds
JOIN casts c ON ds.cast_id = c.cast_id
WHERE c.cruise_key = cr.cruise_key), 0) AS dic
FROM cruise cr
JOIN ship s ON cr.ship_key = s.ship_key
ORDER BY year DESC, month DESC"
)
)
n_cs <- dbGetQuery(con_wdl, "SELECT COUNT(*) AS n FROM cruise_summary")$n
message(glue("Created cruise_summary table: {n_cs} rows"))
}
# Created cruise_summary table: 691 rows
tbl(con_wdl, "cruise_summary") |>
collect() |>
datatable(caption = "cruise_summary preview")
# run standard release validation (wrapped in tryCatch for GEOMETRY compat)
tryCatch(
{
validation <- validate_for_release(con_wdl)
if (validation$passed) {
message("Release validation passed!")
} else {
cat("Validation FAILED:\n")
cat(paste("-", validation$errors, collapse = "\n"))
}
},
error = function(e) {
message(glue("validate_for_release skipped: {e$message}"))
}
)
```
## Show Combined Schema
```{r}
# dir_frozen used later; define early so ERD can reference it
dir_frozen <- here(glue("data/releases/{release_version}"))
dir.create(dir_frozen, recursive = TRUE, showWarnings = FALSE)
erd <- cc_erd(con_wdl, layout = "elk")
plot(erd)
erd <- cc_erd(con_wdl, colors = color_map)
plot(erd)
```
```{r}
#| label: combined_schema
#| fig-width: 12
#| fig-height: 10
# exclude internal tables and views
schema_tbls <- setdiff(
tbls,
c("_meta", "_sp_update", "casts_derived", "ctd_cast_derived"))
# merge per-dataset relationships.json files (auto-discovered — every ingest
# writes data/parquet/{provider}_{dataset}/relationships.json, so new datasets
# are picked up without editing this list)
rels_paths <- Sys.glob(here("data/parquet/*/relationships.json"))
dir_frozen <- here(glue("data/releases/{release_version}"))
dir.create(dir_frozen, recursive = TRUE, showWarnings = FALSE)
rels_merged_path <- file.path(dir_frozen, "relationships.json")
if (length(rels_paths) > 0) {
merge_relationships_json(rels_paths, rels_merged_path)
# append cross-dataset FKs authored in metadata/relationships_cross.csv
rels_merged <- jsonlite::fromJSON(
rels_merged_path, simplifyVector = FALSE)
rels_merged$foreign_keys <- c(
rels_merged$foreign_keys, cross_fks)
jsonlite::write_json(
rels_merged, rels_merged_path,
auto_unbox = TRUE, pretty = TRUE, null = "null")
# emit a flat, reviewable view of every relationship (intra + cross) alongside
# relationships.json / erd.mmd, so the cross-dataset graph is legible as a table
g <- function(x, k) { v <- x[[k]]; if (is.null(v)) NA_character_ else as.character(v) }
fk_df <- do.call(rbind, lapply(rels_merged$foreign_keys, function(fk)
data.frame(
from_table = g(fk, "table"), from_column = g(fk, "column"),
to_table = g(fk, "ref_table"), to_column = g(fk, "ref_column"),
stringsAsFactors = FALSE)))
cross_keys <- paste(cross_fks_df$table, cross_fks_df$column,
cross_fks_df$ref_table, cross_fks_df$ref_column)
fk_df$scope <- ifelse(
paste(fk_df$from_table, fk_df$from_column,
fk_df$to_table, fk_df$to_column) %in% cross_keys, "cross", "intra")
readr::write_csv(fk_df, file.path(dir_frozen, "relationships_all.csv"))
# validate: every cross-FK target column exists in the assembled release schema
schema_cols <- unlist(lapply(schema_tbls, function(t)
paste(t, DBI::dbListFields(con_wdl, t))))
cross_targets <- paste(cross_fks_df$ref_table, cross_fks_df$ref_column)
missing_targets <- cross_fks_df[!(cross_targets %in% schema_cols), , drop = FALSE]
if (nrow(missing_targets) > 0) {
warning(glue(
"cross-FK target(s) missing from release schema: ",
"{paste(missing_targets$ref_table, missing_targets$ref_column, collapse = ', ')}"))
} else {
message(glue(
"cross-FK check: all {nrow(cross_fks_df)} cross-dataset targets present; ",
"wrote {nrow(fk_df)} relationships to relationships_all.csv"))
}
}
# render dataset-colored ERD (stroke outlines; cc_erd handles GEOMETRY natively)
cc_erd(
con_wdl,
tables = schema_tbls,
rels_path = rels_merged_path,
colors = color_map)
```
## Create Frozen Release
Strip provenance columns and export clean parquet files for public
access. See
[Frozen DuckLake](https://ducklake.select/2025/10/24/frozen-ducklake/)
pattern.
```{r}
#| label: freeze_release
dir_frozen <- here(glue("data/releases/{release_version}"))
dir_frozen_pq <- file.path(dir_frozen, "parquet")
dir.create(dir_frozen_pq, recursive = TRUE, showWarnings = FALSE)
message(glue("Creating frozen release: {release_version}"))
# only cruise_summary is derived in this notebook — export locally
# all other tables are GCS-copied from ingest/ (including provenance columns)
derived_tables <- "cruise_summary"
if (nrow(new_tables) > 0) {
# tables with _new additions need local merge + export
merged_base <- unique(sub("_new$", "", new_tables$table))
derived_tables <- c(derived_tables, merged_base)
}
# export only derived/merged tables to local parquet
export_parquet(con_wdl, "cruise_summary",
file.path(dir_frozen_pq, "cruise_summary.parquet"), compression = "zstd")
message("Exported cruise_summary.parquet")
# export merged tables (e.g., ship with _new additions)
for (tbl in setdiff(derived_tables, "cruise_summary")) {
export_parquet(con_wdl, tbl,
file.path(dir_frozen_pq, paste0(tbl, ".parquet")), compression = "zstd")
message(glue("Exported {tbl}.parquet (merged)"))
}
# build freeze stats from registry (auto-discovered)
# exclude _new delta tables (intermediate) and supplemental
freeze_stats <- reg_canon |>
filter(!supplemental, !grepl("_new$", table)) |>
select(table, rows, partitioned, gcs_prefix)
# merged tables (from _new additions) → mark as derived (gcs_prefix = NA → upload from local)
if (nrow(new_tables) > 0) {
merged_base <- unique(sub("_new$", "", new_tables$table))
freeze_stats <- freeze_stats |>
mutate(gcs_prefix = if_else(table %in% merged_base, NA_character_, gcs_prefix))
}
# add derived tables (cruise_summary, etc.)
for (dt in derived_tables) {
if (!dt %in% freeze_stats$table) {
n <- dbGetQuery(con_wdl, glue("SELECT COUNT(*) AS n FROM {dt}"))$n
freeze_stats <- freeze_stats |>
bind_rows(tibble(
table = dt, rows = n, partitioned = FALSE, gcs_prefix = NA_character_))
}
}
freeze_stats |>
datatable(caption = glue("Frozen release {release_version} — {nrow(freeze_stats)} tables"))
```
## Release Notes
```{r}
#| label: release_notes
#| results: asis
# build release notes
tables_list <- paste0(
"- ",
freeze_stats$table,
" (",
format(freeze_stats$rows, big.mark = ","),
" rows)"
)
release_notes <- paste0(
"# CalCOFI Database Release ",
release_version,
"\n\n",
"**Release Date**: ",
Sys.Date(),
"\n\n",
"## Tables Included\n\n",
paste(tables_list, collapse = "\n"),
"\n\n",
"## Total\n\n",
"- **Tables**: ",
nrow(freeze_stats),
"\n",
"- **Total Rows**: ",
format(sum(freeze_stats$rows, na.rm = TRUE), big.mark = ","),
"\n\n",
"## Data Sources\n\n",
"- `ingest_swfsc_ichthyo.qmd` - Ichthyo tables (cruise, ship, site, tow, net, species, ichthyo, grid, segment, lookup, taxon, taxa_rank)\n",
"- `ingest_calcofi_bottle.qmd` - Bottle/cast tables (casts, bottle, bottle_measurement, cast_condition, measurement_type)\n",
"- `ingest_calcofi_ctd-cast.qmd` - CTD tables (ctd_cast, ctd_thin, ctd_summary, measurement_type; full ctd_measurement available as supplemental)\n",
"- `ingest_calcofi_dic.qmd` - DIC/alkalinity tables (dic_sample, dic_measurement, dic_summary, dataset)\n\n",
"## Cross-Dataset Integration\n\n",
"- **Ship matching**: Reconciled ship codes between bottle casts and swfsc ship reference\n",
"- **Cruise bridge**: Derived cruise_key (YYYY-MM-NODC) for bottle casts via ship matching + datetime\n",
"- **Taxonomy**: Standardized species with WoRMS AphiaID, ITIS TSN, GBIF backbone key\n",
"- **Taxon hierarchy**: Built taxon + taxa_rank tables from WoRMS/ITIS classification\n\n",
"## Access\n\n",
"Parquet files can be queried directly from GCS:\n\n",
"```r\n",
"library(duckdb)\n",
"con <- dbConnect(duckdb())\n",
"dbExecute(con, 'INSTALL httpfs; LOAD httpfs;')\n",
"dbGetQuery(con, \"\n",
" SELECT * FROM read_parquet(\n",
" 'https://storage.googleapis.com/calcofi-db/ducklake/releases/",
release_version,
"/parquet/ichthyo.parquet')\n",
" LIMIT 10\")\n",
"```\n\n",
"Or use calcofi4r:\n\n",
"```r\n",
"library(calcofi4r)\n",
"con <- cc_get_db(version = '",
release_version,
"')\n",
"```\n"
)
writeLines(release_notes, file.path(dir_frozen, "RELEASE_NOTES.md"))
message(glue(
"Release notes written to {file.path(dir_frozen, 'RELEASE_NOTES.md')}"
))
cat(release_notes)
```
## Upload Frozen Release to GCS
```{r}
#| label: upload_frozen
gcs_bucket <- "calcofi-db"
gcs_release <- glue("ducklake/releases/{release_version}")
gcloud <- find_gcloud()
# 1. GCS server-side copy for ingest tables (auto-discovered from registry)
copy_rows <- freeze_stats |> filter(!is.na(gcs_prefix))
message(glue("Copying {nrow(copy_rows)} tables from ingest/ to releases/ on GCS..."))
for (i in seq_len(nrow(copy_rows))) {
tbl <- copy_rows$table[i]
pfx <- copy_rows$gcs_prefix[i]
part <- copy_rows$partitioned[i]
if (part) {
# partitioned: copy directory
src <- glue("gs://{gcs_bucket}/{pfx}/{tbl}")
dst <- glue("gs://{gcs_bucket}/{gcs_release}/parquet/{tbl}")
res <- system2(gcloud, c("storage", "cp", "-r",
paste0(src, "/*"), dst), stdout = TRUE, stderr = TRUE)
} else {
src <- glue("gs://{gcs_bucket}/{pfx}/{tbl}.parquet")
dst <- glue("gs://{gcs_bucket}/{gcs_release}/parquet/{tbl}.parquet")
res <- system2(gcloud, c("storage", "cp",
src, dst), stdout = TRUE, stderr = TRUE)
}
rc <- attr(res, "status") %||% 0L
if (rc != 0) {
stop(glue("GCS copy failed for {tbl}: {src} -> {dst}\n",
" exit code {rc}: {paste(res, collapse = '\n')}"))
}
message(glue(" {tbl}: copied from {pfx}"))
}
# 2. upload derived tables from local parquet (cruise_summary)
derived_local <- list.files(dir_frozen_pq, pattern = "[.]parquet$",
full.names = TRUE)
for (pq in derived_local) {
tbl <- tools::file_path_sans_ext(basename(pq))
gcs_path <- glue("gs://{gcs_bucket}/{gcs_release}/parquet/{tbl}.parquet")
put_gcs_file(pq, gcs_path)
message(glue(" {tbl}: uploaded (derived)"))
}
# 3. build and upload catalog.json (needed by cc_get_db())
tables_df <- freeze_stats |>
select(name = table, rows, partitioned)
# sum bytes of the uploaded parquet tree on GCS
du_out <- system2(
gcloud,
c("storage", "du", "--summarize",
glue("gs://{gcs_bucket}/{gcs_release}/parquet/")),
stdout = TRUE, stderr = TRUE)
total_bytes <- suppressWarnings(
as.numeric(sub("\\s.*$", "", trimws(du_out[1]))))
if (is.na(total_bytes)) {
warning(glue("Could not parse gcloud storage du output: {paste(du_out, collapse='; ')}"))
total_bytes <- 0
}
catalog <- list(
version = release_version,
release_date = as.character(Sys.Date()),
total_rows = sum(tables_df$rows, na.rm = TRUE),
total_size = total_bytes,
tables = tables_df)
catalog_path <- file.path(dir_frozen, "catalog.json")
jsonlite::write_json(catalog, catalog_path, auto_unbox = TRUE, pretty = TRUE)
put_gcs_file(catalog_path,
glue("gs://{gcs_bucket}/{gcs_release}/catalog.json"))
# upload RELEASE_NOTES.md
notes_path <- file.path(dir_frozen, "RELEASE_NOTES.md")
if (file.exists(notes_path))
put_gcs_file(notes_path,
glue("gs://{gcs_bucket}/{gcs_release}/RELEASE_NOTES.md"))
# upload relationships.json
rels_json <- file.path(dir_frozen, "relationships.json")
if (file.exists(rels_json))
put_gcs_file(rels_json,
glue("gs://{gcs_bucket}/{gcs_release}/relationships.json"))
# build and upload metadata.json (table/column descriptions + units).
# auto-discover every ingest's metadata.json (same as rels_paths) so newly added
# datasets' tables/columns are merged in — not just a hardcoded set.
meta_paths <- Sys.glob(here("data/parquet/*/metadata.json"))
meta_json_path <- file.path(dir_frozen, "metadata.json")
if (length(meta_paths) > 0) {
merge_metadata_json(
paths = meta_paths,
output_path = meta_json_path,
release_version = release_version,
release_tables_csv = here("metadata/release_tables.csv"),
release_columns_csv = here("metadata/release_columns.csv"),
measurement_type_csv = here("metadata/measurement_type.csv"),
ingest_yaml = ingest_yaml,
table_rows = setNames(freeze_stats$rows, freeze_stats$table))
# enrich columns with data_type from the working DuckDB (so the schema
# site can render types without spinning up DuckDB-WASM)
schema_cols <- DBI::dbGetQuery(con_wdl, "
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'main'")
meta <- jsonlite::read_json(meta_json_path, simplifyVector = FALSE)
n_typed <- 0L
for (i in seq_len(nrow(schema_cols))) {
key <- paste0(schema_cols$table_name[i], ".", schema_cols$column_name[i])
if (key %in% names(meta$columns)) {
meta$columns[[key]]$data_type <- schema_cols$data_type[i]
} else {
meta$columns[[key]] <- list(data_type = schema_cols$data_type[i])
}
n_typed <- n_typed + 1L
}
jsonlite::write_json(meta, meta_json_path,
auto_unbox = TRUE, pretty = TRUE, null = "null")
message(glue("metadata.json enriched with data_type for {n_typed} columns"))
put_gcs_file(meta_json_path,
glue("gs://{gcs_bucket}/{gcs_release}/metadata.json"))
# erd.mmd sidecar: Mermaid ER diagram driven by relationships.json
rels_for_erd <- file.path(dir_frozen, "relationships.json")
if (file.exists(rels_for_erd)) {
erd <- cc_erd(
con = con_wdl,
rels_path = rels_for_erd,
colors = color_map,
view_type = "all")
erd_path <- file.path(dir_frozen, "erd.mmd")
writeLines(unclass(erd), erd_path)
# validate the Mermaid parses before publishing — a malformed erd.mmd
# (e.g. erDiagram styling unsupported by an older mermaid) would break the
# schema site, which renders it client-side with mermaid. Validate with
# mermaid-cli (mmdc); KEEP schema/_config.yml `mermaid_version` >= this
# mmdc's bundled mermaid so the site accepts what passes here.
mmdc <- Sys.which("mmdc")
if (nzchar(mmdc)) {
erd_svg_check <- tempfile(fileext = ".svg")
erd_val <- suppressWarnings(system2(
mmdc, c("-i", erd_path, "-o", erd_svg_check),
stdout = TRUE, stderr = TRUE))
if (!identical(attr(erd_val, "status"), NULL) &&
!identical(attr(erd_val, "status"), 0L)) {
stop(glue(
"erd.mmd failed Mermaid validation; not uploading.\n",
"{paste(erd_val, collapse = '\n')}"))
}
message("erd.mmd passed Mermaid validation (mmdc)")
} else {
warning("mmdc not found; skipping Mermaid validation of erd.mmd")
}
put_gcs_file(erd_path,
glue("gs://{gcs_bucket}/{gcs_release}/erd.mmd"))
message(glue("erd.mmd uploaded ({length(attr(erd, 'tables'))} tables)"))
} else {
warning("relationships.json missing; skipping erd.mmd sidecar")
}
} else {
warning("No per-ingest metadata.json files found; skipping release metadata.json")
}
# 4. update versions.json (latest.txt promotion is deferred to test_release.qmd)
# discover all releases from GCS and rebuild versions.json
gcs_ls <- system2(gcloud, c("storage", "ls",
glue("gs://{gcs_bucket}/ducklake/releases/")),
stdout = TRUE, stderr = TRUE)
release_vers <- regmatches(gcs_ls,
regexpr("v[0-9]{4}[.][0-9]{2}[.]*[0-9]*", gcs_ls))
https_base <- glue("https://storage.googleapis.com/{gcs_bucket}/ducklake/releases")
all_versions <- purrr::compact(lapply(release_vers, function(v) {
tryCatch({
cat_data <- jsonlite::fromJSON(glue("{https_base}/{v}/catalog.json"))
list(
version = cat_data$version,
release_date = cat_data$release_date %||% NA_character_,
tables = if (is.data.frame(cat_data$tables)) nrow(cat_data$tables)
else length(cat_data$tables),
total_rows = as.integer(cat_data$total_rows %||% 0),
size_mb = round((cat_data$total_size %||% 0) / 1024 / 1024, 1))
}, error = function(e) NULL)
}))
all_versions <- all_versions[order(
sapply(all_versions, `[[`, "version"), decreasing = TRUE)]
versions_local <- tempfile(fileext = ".json")
jsonlite::write_json(list(versions = all_versions), versions_local,
auto_unbox = TRUE, pretty = TRUE)
put_gcs_file(versions_local,
glue("gs://{gcs_bucket}/ducklake/releases/versions.json"))
# the schema site (calcofi.io/schema) fetches these JSON/mmd sidecars at runtime
# and they are OVERWRITTEN in place when a release is re-run (e.g. to fix a bug).
# GCS defaults to `cache-control: public, max-age=3600`, so a corrected re-upload
# stays masked by browser/CDN caches for up to an hour. Tag the mutable sidecars
# `no-cache` (revalidate-always; cheap 304s) so a re-render is visible immediately.
sidecar_urls <- c(
glue("gs://{gcs_bucket}/ducklake/releases/versions.json"),
glue("gs://{gcs_bucket}/{gcs_release}/catalog.json"),
glue("gs://{gcs_bucket}/{gcs_release}/metadata.json"),
glue("gs://{gcs_bucket}/{gcs_release}/relationships.json"),
glue("gs://{gcs_bucket}/{gcs_release}/erd.mmd"),
glue("gs://{gcs_bucket}/{gcs_release}/RELEASE_NOTES.md"))
cc_res <- system2(gcloud,
c("storage", "objects", "update", "--cache-control=no-cache", sidecar_urls),
stdout = TRUE, stderr = TRUE)
if (!identical(attr(cc_res, "status") %||% 0L, 0L))
warning(glue("could not set no-cache on sidecars: {paste(cc_res, collapse='; ')}"))
message("runtime sidecars tagged cache-control: no-cache")
# NOTE: latest.txt is NOT updated here. Promotion is gated on the
# query-test pass in test_release.qmd, which writes latest.txt only
# when every pre-baked query in CalCOFI/query/_queries succeeds.
message(glue(
"Release {release_version} uploaded ({length(all_versions)} versions tracked); ",
"latest.txt promotion deferred to test_release.qmd"))
```
## Cleanup
```{r}
#| label: cleanup
# close in-memory DuckDB connection
close_duckdb(con_wdl)
message("Assembly DuckDB connection closed")
# summary
message(glue("\n=== Summary ==="))
message(glue("Frozen release: {release_version} created at {dir_frozen}"))
message(glue("Tables: {nrow(freeze_stats)}"))
message(glue("Total rows: {format(sum(freeze_stats$rows, na.rm = TRUE), big.mark = ',')}"))
```
::: {.callout-caution collapse="true"}
## Session Info
```{r session_info}
devtools::session_info()
```
:::