---
title: "Scrape & Ingest CalCOFI CTD Data into DuckDB"
---
## Overview
This notebook scrapes CalCOFI CTD data from <https://calcofi.org/data/oceanographic-data/ctd-cast-files/>, downloads only **final** and **preliminary** CTD files, and ingests them into a DuckDB database.
### Key Features
1. **Web Scraping**: Scrapes for all CTD .zip download links
2. **Smart Filtering**:
- Downloads all .zip files for archival completeness
- Only unzips `final` (e.g., 20-2105SH_CTDFinalQC.zip) and `preliminary` (e.g., 20-2507SR_CTDPrelim.zip) files
- Skips unzipping raw/cast/test files to save disk space
3. **Download & Unzip**: Downloads all zip files to `~/My Drive/projects/calcofi/data/calcofi.org/ctd-cast/download` ([calcofi/data](https://drive.google.com/drive/u/0/folders/1xxdWa4mWkmfkJUQsHxERTp9eBBXBMbV7)) but only extracts final and preliminary data
4. **Priority-based Selection**:
- For each cruise (identified by yy-yymm + ship code), selects final if available
- Falls back to preliminary if final not available for that cruise
- Completely excludes raw/test/prodo cast files
5. **Union Strategy**:
- Merges upcast (U) and downcast (D) CSV files into single "ctd" table
- Adds `stage` flag: "F" for final, "P" for preliminary
- Adds `cast_dir` flag: "U" for upcast, "D" for downcast
6. **Data Standardization**:
- Converts all column names to `lower_snake_case` using `janitor::clean_names()`
- Creates spatial index on `lat_dec` and `lon_dec` fields
- Preserves `cruise_id` and `source_file` metadata
### Usage
To run the notebook:
- R: `quarto::quarto_render("scrape_ctd.qmd")`
- Terminal: `quarto render scrape_ctd.qmd`
### Notes
- All .zip files are downloaded for archival purposes, but only final/prelim are unzipped
- Only processes CSV files from final (`db-csvs/*.csv`) and preliminary directories
- Downloads can be resumed (already-downloaded files are skipped)
- Both upcast (U) and downcast (D) profiles are included in the database
- The priority system ensures you only get final data when available, otherwise preliminary
- Spatial index enables efficient geographic queries on lat_dec and lon_dec
## Setup
```{r}
#| label: setup
librarian::shelf(
"CalCOFI/calcofi4r",
dplyr,
DBI,
DT,
duckdb,
fs,
glue,
here,
htmltools,
httr,
janitor,
litedown,
lubridate,
mapview,
plotly,
purrr,
readr,
rvest,
sf,
stringr,
tibble,
tidyr,
zip,
quiet = T
)
# helper functions
dt <- function(d, fname, n_page = 10, ...) {
# d <- data.frame(x = 1:9)
lengths <- c(5, 10, 25, 50, 100, 1000)
lengths <- c(lengths[lengths < nrow(d)], nrow(d)) |> unique()
DT::datatable(
d,
extensions = 'Buttons',
options = list(
dom = 'Blfrtpi',
buttons = list(
list(
extend = 'csv',
filename = fname
)
),
pageLength = min(n_page, nrow(d)),
lengthMenu = lengths,
scrollX = T
),
filter = 'top',
# rownames = FALSE,
...
)
}
# variables
url <- "https://calcofi.org/data/oceanographic-data/ctd-cast-files/"
dir_dl <- "~/My Drive/projects/calcofi/data/calcofi.org/ctd-cast/download"
dir_dl <- path_expand(dir_dl)
bind_prelonlat_rds <- glue("{dir_dl}/bind_pre-lon-lat-clean.rds")
pts_filt_rds <- glue("{dir_dl}/pts_filt.rds")
ctd_pq <- glue("{dir_dl}/ctd.parquet")
db_path <- glue("{dir_dl}/calcofi-ctd.duckdb")
db_redo <- T # set to TRUE to re-download and re-ingest
# create directory if it doesn't exist
dir_create(dir_dl, recurse = TRUE)
# delete if exixsts (for fresh run)
if (file_exists(db_path) & db_redo) {
file_delete(db_path)
}
# setup duckdb with spatial extension
# dbDisconnect(con, shutdown = TRUE); duckdb_shutdown(duckdb()); rm(con)
con <- dbConnect(duckdb::duckdb(read_only = F), dbdir = db_path) # dbListTables(con)
dbExecute(con, "INSTALL spatial;")
dbExecute(con, "LOAD spatial;")
```
- `url`: <`r url`>
## Scrape CTD Download Links
```{r}
#| label: d_zips
# read web page, extract all .zip download links
d_zips <- read_html(url) |>
html_nodes("a[href$='.zip']") |>
html_attr("href") |>
tibble(url = _) |>
mutate(
# ensure full URLs
url = if_else(
str_starts(url, "http"),
url,
paste0("https://calcofi.org", url)
),
# extract file_zip
file_zip = basename(url),
# extract year from URL path
year = str_extract(url, "/(\\d{4})/", group = 1) |> as.integer(),
# extract month from URL path
month = str_extract(url, "-\\d{2}(\\d{2})", group = 1) |> as.integer(),
# extract cruise identifier (yymm + ship code)
cruise_id = str_extract(
file_zip,
"\\d{2}-(\\d{4}[A-Z0-9]{2,4})",
group = 1
),
# TODO: confirm oddball ship codes (normally 2 letters): 0404NHJD (4 letters), 0907M2 (letter + number)
# classify data type based on file_zip patterns
zip_type = case_when(
str_detect(file_zip, "CTDFinal") ~ "final",
str_detect(file_zip, "CTDPrelim") ~ "preliminary",
str_detect(file_zip, "CTDCast|CTD_Cast") ~ "raw",
str_detect(file_zip, "CTDTest") ~ "test",
TRUE ~ "unknown"
)
)
# stop if other detected
if (any(d_zips$zip_type == "unknown")) {
stop("Unknown zip_type detected in file_zips")
}
# show summary (all files)
d_zips |>
mutate(
file_zip = glue("<a href={url}>{file_zip}</a>")
) |>
select(-url) |>
dt(
caption = "All zip files to download",
fname = "ctd_zip_files",
escape = F
)
```
## Download and Unzip Files
```{r}
#| label: download_and_unzip
# function to download and optionally unzip a file
download_and_unzip <- function(url, dest_dir, zip_type, unzip = TRUE) {
file_zip <- basename(url)
dest_file <- file.path(dest_dir, file_zip)
dir_unzip <- file.path(dest_dir, str_remove(file_zip, "\\.zip$"))
# skip if already downloaded
if (file_exists(dest_file)) {
message(glue("Already exists: {file_zip}"))
} else {
message(glue("Downloading: {file_zip}"))
download.file(url, dest_file, mode = "wb")
}
# skip if already unzipped
if (unzip && dir_exists(dir_unzip)) {
message(glue("Already unzipped: {file_zip}"))
} else {
# only unzip if final or preliminary (case-insensitive)
if (zip_type %in% c("final", "preliminary")) {
message(glue("Unzipping: {file_zip}"))
dir_create(dir_unzip, recurse = TRUE)
unzip(dest_file, exdir = dir_unzip)
} else if (unzip) {
message(glue("Skipping unzip (not final/preliminary): {file_zip}"))
}
}
}
# download all files, but only unzip final and preliminary
d_zips |>
pwalk(function(url, file_zip, zip_type, ...) {
download_and_unzip(url, dir_dl, zip_type, unzip = TRUE)
})
message("Download and extraction complete!")
```
## Find and Prioritize CTD Data Files
```{r}
#| label: d_csv
# create file inventory
d_csv <- tibble(
path = dir_ls(dir_dl, recurse = TRUE, regexp = "\\.csv$")
) |>
mutate(
file_csv = basename(path),
path_unzip = str_replace(path, glue("{dir_dl}/"), ""),
# get first directory in path_unzip
dir_unzip = str_extract(path_unzip, "^[^/]+"),
cruise_id = str_extract(
path_unzip,
"\\d{2}-(\\d{4}[A-Z0-9]{2,4})_.*",
group = 1
),
# classify data type from parent directory
data_stage = case_when(
str_detect(path_unzip, "Final.*db[_|-]csv") ~ "final",
str_detect(path_unzip, "Prelim.*db[_|-]csv") ~ "preliminary",
# TODO: note these strange exceptions
cruise_id == "2507SR" &
str_detect(path_unzip, "Prelim.*csv") ~ "preliminary",
cruise_id == "2111SR" &
str_detect(
path_unzip,
"Prelim.*csvs-plots.*2111SR.*csv"
) ~ "preliminary",
.default = NA_character_
),
# detect cast direction from file_csv (U=upcast, D=downcast)
cast_dir = case_when(
str_detect(file_csv, regex("U\\.csv$", ignore_case = T)) ~ "U",
str_detect(file_csv, regex("D\\.csv$", ignore_case = T)) ~ "D"
),
# prioritize: final=1, preliminary=2
priority = case_when(
data_stage == "final" ~ 1,
data_stage == "preliminary" ~ 2,
TRUE ~ 3
)
) |>
relocate(cruise_id, path_unzip) |>
arrange(cruise_id, path_unzip) |>
# keep only final and preliminary files
filter(data_stage %in% c("final", "preliminary"))
# for each cruise, keep only final if available, otherwise preliminary
d_priority <- d_csv |>
group_by(cruise_id) |>
summarize(
best_priority = min(priority),
.groups = "drop"
)
d_csv <- d_csv |>
inner_join(d_priority, by = "cruise_id") |>
filter(priority == best_priority) |>
select(-best_priority)
cruises_csv_notzip <- setdiff(
unique(d_csv$cruise_id),
unique(d_zips$cruise_id)
) |>
sort()
stopifnot(length(cruises_csv_notzip) == 0)
# TODO: cruises_zip_notcsv: find workable form of raw data
cruises_zip_notcsv <- setdiff(
unique(d_zips$cruise_id),
unique(d_csv$cruise_id)
) |>
sort()
# [1] "0001NH" "0004JD" "0007NH" "0010NH" "0101JD" "0104JD" "0107NH" "0110NH" "0201JD" "0204JD" "0207NH"
# [12] "0211NH" "0304RR" "0404NH" "1203NH" "9003JD" "9011NH" "9101JD" "9103JD" "9108JD" "9110NH" "9202JD"
# [23] "9204JD" "9207NH" "9210NH" "9301JD" "9304JD" "9308NH" "9310NH" "9401JD" "9403JD" "9408NH" "9410NH"
# [34] "9501JD" "9504NH" "9507JD" "9510NH" "9602JD" "9604JD" "9608NH" "9610RR" "9702JD" "9704NH" "9707JD"
# [45] "9709NH" "9712SP" "9803SP" "9805SP" "9808SP" "9809NH" "9810SP" "9811SP" "9812SP" "9901RR" "9904JD"
# [56] "9908NH" "9910NH"
# TODO: plot(ctdDecimate(ctdTrim(read.ctd("stn123.cnv")))) per [oce](https://dankelley.github.io/oce/articles/B_ctd.html#raw-data)
# librarian::shelf(oce, quiet = T)
# dir_raw <- "/Users/bbest/Library/CloudStorage/GoogleDrive-ben@ecoquants.com/My Drive/projects/calcofi/data/calcofi.org/ctd-cast/download/19-9003JD_CTDTest"
# raw_9003JD_dat <- '/STA3-2.DAT'
# d_ctd <- read.ctd(dir_ls(dir_raw)) |> plot(ctdDecimate(ctdTrim(.)))
# if (length(missing_cruises) > 0) {
# warning(glue("Warning: Missing cruises with no final/preliminary files: {paste(missing_cruises, collapse = ', ')}"))
# # 0404JD
# }
# summary
d_csv |>
select(-any_of(c("path", "data", "col_empties", "col_types"))) |>
relocate(cruise_id, path_unzip) |>
arrange(cruise_id, path_unzip) |>
dt(
caption = "Files to Ingest",
fname = "ctd_files_to_ingest"
)
```
## Read and Standardize CTD Files
- Remove repeat header rows that can occur within files, eg line 69 of '20-0711NH_CTDFinalQC/db-csvs/20-0711_CTDBTL_001-067Dno001b.csv'
```{r}
#| label: read_csv
# read and combine all files (both upcast and downcast)
d_csv <- d_csv |>
arrange(basename(path)) |>
mutate(
data = map2(path, seq_along(path), \(path, idx) {
message(glue("Reading {idx}/{nrow(d_csv)}: {basename(path)}"))
# detect repeat header rows and skip them
# eg line 69 of '20-0711NH_CTDFinalQC/db-csvs/20-0711_CTDBTL_001-067Dno001b.csv'
# Read all lines to detect repeat headers
all_lines <- read_lines(path)
header_line <- all_lines[1]
# Find all rows that exactly match the header (excluding the first row)
repeat_header_rows <- which(all_lines[-1] == header_line)
if (length(repeat_header_rows) > 0) {
# Remove repeat header rows from the data
all_lines <- all_lines[-(repeat_header_rows + 1)]
# Write cleaned data to temporary file
tmp_file <- tempfile(fileext = ".csv")
write_lines(all_lines, tmp_file)
# Read from cleaned file
data <- read_csv(tmp_file, guess_max = Inf, show_col_types = F) |>
clean_names()
# Clean up
file_delete(tmp_file)
} else {
# No repeat headers, read normally
data <- read_csv(path, guess_max = Inf, show_col_types = F) |>
clean_names()
}
data
}),
# add nrows per file to ctd_data
nrows = map_int(data, ~ if (is.null(.x)) 0 else nrow(.x))
)
d_csv |>
arrange(cruise_id, file_csv) |>
select(cruise_id, file_csv, nrows) |>
dt(
caption = "Number of rows read per file",
fname = "ctd_rows_per_file"
) |>
# big marks for thousands
formatCurrency("nrows", currency = "", digits = 0, mark = ",")
```
## Transform Data : Detect and Correct Column Type Mismatches
```{r}
#| label: d_bind
# Detect expected types from the most common type across all files
d_csv <- d_csv |>
mutate(
# Identify completely empty columns in each data frame
col_empties = map(data, \(x) {
tibble(
col_name = names(x),
n_empty = map_int(x, \(col) sum(is.na(col)))
) |>
filter(n_empty == nrow(x)) |>
pull(col_name)
}),
# Extract column types from each data frame
col_types = map2(data, col_empties, \(x, y) {
tibble(
col_name = names(x),
col_type = map_chr(x, \(col) class(col)[1])
) |>
filter(!col_name %in% y)
})
)
# Find the most common type for each column across all files
d_types <- d_csv |>
select(path, col_types) |>
unnest(col_types) |>
count(col_name, col_type) |>
group_by(col_name) |>
slice_max(n, n = 1, with_ties = FALSE) |>
ungroup() |>
select(col_name, expected_type = col_type)
# Identify files with type mismatches
d_mismatches <- d_csv |>
select(cruise_id, path, col_types) |>
unnest(col_types) |>
left_join(d_types, by = "col_name") |>
filter(col_type != expected_type) |>
arrange(col_name, path)
# Log mismatches if any exist
if (nrow(d_mismatches) > 0) {
message("Type mismatches detected - converting columns...")
}
# Bind data, converting mismatched columns to expected type
d_bind <- d_csv |>
mutate(
data = map2(data, path, \(x, p) {
# Get columns that need type conversion for this file
x_mismatches <- d_mismatches |>
filter(path == p)
if (nrow(x_mismatches) > 0) {
for (i in 1:nrow(x_mismatches)) {
col <- x_mismatches$col_name[i]
expected <- x_mismatches$expected_type[i]
# Count NAs before conversion
na_before <- sum(is.na(x[[col]]))
# Convert to expected type
suppressWarnings({
x[[col]] <- switch(
expected,
"numeric" = as.numeric(x[[col]]),
"integer" = as.integer(x[[col]]),
"logical" = as.logical(x[[col]]),
"character" = as.character(x[[col]]),
x[[col]]
)
})
# Count NAs after conversion
na_after <- sum(is.na(x[[col]]))
na_generated <- na_after - na_before
if (na_generated > 0) {
message(glue(
" {basename(p)}: {col} ({x_mismatches$col_type[i]} → {expected}) generated {na_generated} NAs"
))
}
# Store NA count in d_mismatches for reporting
d_mismatches[
d_mismatches$path == p & d_mismatches$col_name == col,
"nas_generated"
] <<- na_generated
}
}
x
})
) |>
unnest(data)
# Report on type mismatches with NA generation
if (nrow(d_mismatches) > 0) {
# Summary by column
d_mismatches |>
group_by(col_name, expected_type, col_type) |>
summarize(
n_files = n(),
total_nas = sum(nas_generated, na.rm = TRUE),
files = paste(basename(path), collapse = "; "),
.groups = "drop"
) |>
arrange(desc(total_nas)) |>
dt(
caption = "Type mismatches by column",
fname = "ctd_type_mismatches_by_column"
) |>
formatCurrency(
c("n_files", "total_nas"),
currency = "",
digits = 0,
mark = ","
)
# Summary by file
d_mismatches |>
mutate(
path_csv = basename(path),
col_expr = glue(
"{col_name}: {col_type} → {expected_type} ({nas_generated} NAs)<br>"
)
) |>
arrange(path_csv, col_name) |>
group_by(path_csv) |>
summarize(
n_columns = n(),
total_nas = sum(nas_generated, na.rm = TRUE),
columns = paste(col_expr, collapse = "\n"),
.groups = "drop"
) |>
arrange(desc(total_nas), path_csv) |>
dt(
caption = "Type mismatches by file",
fname = "ctd_type_mismatches_by_file",
escape = FALSE
) |>
formatCurrency(
c("n_columns", "total_nas"),
currency = "",
digits = 0,
mark = ","
)
}
# TODO: drop study / replace with cruise_id
# d_bind$study |> table(useNA = "ifany")
# d_bind$cruise_id |> table(useNA = "ifany") # NAs: 10,9782
d_bind |>
filter(
!is.na(study),
!is.na(cruise_id),
cruise_id != study
) |>
distinct(study, cruise_id, path_unzip) |>
dt(
caption = "Mismatch between study and cruise_id",
fname = "ctd_study_cruise_mismatch"
)
# study cruise_id path_unzip
# <chr> <chr> <chr>
# 1 0404JD 0404NHJD 20-0404NHJD_CTDFinalQC/db_csvs/20-0404JD_CTDBTL_067-094ID.csv
# 2 0404JD 0404NHJD 20-0404NHJD_CTDFinalQC/db_csvs/20-0404JD_CTDBTL_067-094IU.csv
# 3 0404NH 0404NHJD 20-0404NHJD_CTDFinalQC/db_csvs/20-0404NH_CTDBTL_001-066ID.csv
# 4 0404NH 0404NHJD 20-0404NHJD_CTDFinalQC/db_csvs/20-0404NH_CTDBTL_001-066IU.csv
# 5 Study 0711NH 20-0711NH_CTDFinalQC/db-csvs/20-0711_CTDBTL_001-067Dno001b.csv
#
# update cruise_id to study where mismatched, except if study is "Study"
d_bind <- d_bind |>
mutate(
cruise_id = if_else(
!is.na(study) &
cruise_id != study &
study != "Study",
study,
cruise_id
),
source_file = path_unzip
) |>
relocate(source_file, .after = cruise_id) |>
select(
-path_unzip,
-path,
-file_csv,
-dir_unzip,
-priority,
-project,
-study,
-nrows,
-col_empties,
-col_types
)
# d_bind |>
# filter(is.na(ord_occ)) |>
# pull(cast_id) |>
# table()
# 0707_037d 0707_037u 0707_072d 0707_072u 0907_026d 0907_026u 0907_027d 0907_027u
# 1024 1024 460 460 518 518 277 277
# update ord_occ from cast_id where missing
d_bind <- d_bind |>
mutate(
ord_occ = if_else(
is.na(ord_occ) & !is.na(cast_id),
str_extract(cast_id, "_(\\d{3})", group = 1),
ord_occ
)
)
# is.na(d_bind$event_num) |> table()
# FALSE TRUE
# 3,927,815 2,881,340
# TODO: figure out missing ord_occ
```
## Format Date-Time Column
```{r}
#| label: format_datetime
# apply formats depending on pattern
d_bind <- d_bind |>
mutate(
date_time_utc = trim(date_time_utc),
date_time_format = case_when(
str_detect(
date_time_utc, # eg "25-Jan-1998 13:01:20"
"^\\d{1,2}-[A-z]{3}-\\d{4} \\d{1,2}:\\d{1,2}:\\d{1,2}$"
) ~ "dmy_hms",
str_detect(
date_time_utc, # eg "01/25/1998 13:01", "1/25/1998 13:1"
"^\\d{1,2}/\\d{1,2}/\\d{4} \\d{1,2}:\\d{1,2}$"
) ~ "mdy_hm",
TRUE ~ "unknown"
),
dtime_utc = NA, # _POSIXct_, # 163,181 | 6,596,130 failed to parse.
# TODO: add summary of datetime formats by dataset be
dtime_utc = if_else(
str_detect(
date_time_utc, # eg "25-Jan-1998 13:01:20"
"^\\d{1,2}-[A-z]{3}-\\d{4} \\d{1,2}:\\d{1,2}:\\d{1,2}$"
),
suppressWarnings(dmy_hms(date_time_utc, tz = "UTC")),
dtime_utc
),
dtime_utc = if_else(
str_detect(
date_time_utc, # eg "01/25/1998 13:01", "1/25/1998 13:1"
"^\\d{1,2}/\\d{1,2}/\\d{4} \\d{1,2}:\\d{1,2}$"
),
suppressWarnings(mdy_hm(date_time_utc, tz = "UTC")),
dtime_utc
)
) |>
relocate(date_time_format, dtime_utc, .after = date_time_utc) |>
# select(-date_time_utc, -date_time_pst) |>
arrange(dtime_utc, depth)
stopifnot(all(!is.na(d_bind$dtime_utc)))
write_rds(d_bind, bind_prelonlat_rds, compress = "gz")
file_info(bind_prelonlat_rds) |> pull(size) # 657M
d_bind |>
group_by(
cruise_id,
source_file,
date_time_format,
) |>
summarize(
n_records = n(),
dtime_utc_min = min(dtime_utc),
dtime_utc_max = max(dtime_utc),
.groups = "drop"
) |>
arrange(cruise_id, source_file, date_time_format) |>
# pivot date_time_format wider
pivot_wider(
names_from = date_time_format,
values_from = n_records,
values_fill = NA_real_
) |>
relocate(dmy_hms, mdy_hm, .after = cruise_id) |>
dt(
caption = "Date-Time formats detected by cruise_id and source_file",
fname = "ctd_datetime_formats"
) |>
formatDate(c("dtime_utc_min", "dtime_utc_max"), method = "toLocaleString")
```
## Assign Pseudo-NA values to Longitude and Latitude and Fill Missing Coordinates from Line/Station
```{r}
#| label: fill_lonlat
d_bind <- read_rds(bind_prelonlat_rds)
# summary(d_bind_cr$lon_dec)
# Min. 1st Qu. Median Mean 3rd Qu. Max.
# -124.3 -122.5 -120.9 -114.1 -119.0 0.0
# d_bind_cr |>
# filter(lon_dec > -100) |>
# pull(lon_dec) |>
# unique() |>
# print() # -9.99e-29
# d_bind_cr |>
# filter(lon_dec == -99) |>
# select(cruise_id, source_file, line, sta, lon_dec, lat_dec)
pseudoNA_values <- c(-9.99e-29, -99)
d_bind_sp <- d_bind |>
mutate(
is_lnsta_pseudoNA = ifelse(
as.numeric(line) == 0 & as.numeric(sta) == 0,
TRUE,
FALSE
),
is_lon_pseudoNA = map_lgl(lon_dec, ~ some(pseudoNA_values, near, .x)),
is_lat_pseudoNA = map_lgl(lat_dec, ~ some(pseudoNA_values, near, .x)),
lon_dec = if_else(
is_lon_pseudoNA | is_lat_pseudoNA,
NA_real_,
lon_dec
),
lat_dec = if_else(
is_lon_pseudoNA | is_lat_pseudoNA,
NA_real_,
lat_dec
),
lon_lnst = if_else(
!is.na(line) & !is.na(sta) & !is_lnsta_pseudoNA,
as.numeric(sf_project(
from = "+proj=calcofi",
to = "+proj=longlat +datum=WGS84",
pts = cbind(x = as.numeric(line), y = as.numeric(sta))
)[, 1]),
NA_real_
),
lat_lnst = if_else(
!is.na(line) & !is.na(sta) & !is_lnsta_pseudoNA,
as.numeric(sf_project(
from = "+proj=calcofi",
to = "+proj=longlat +datum=WGS84",
pts = cbind(x = as.numeric(line), y = as.numeric(sta))
)[, 2]),
NA_real_
),
lon_dec = if_else(is.na(lon_dec), lon_lnst, lon_dec),
lat_dec = if_else(is.na(lat_dec), lat_lnst, lat_dec)
)
stopifnot(sum(is.na(d_bind_sp$lon_dec)) == 0)
stopifnot(sum(is.na(d_bind_sp$lat_dec)) == 0)
d_bind_sp |>
filter(
is_lon_pseudoNA | is_lat_pseudoNA
) |>
group_by(cruise_id) |>
summarize(
n_lon_pseudoNA = sum(is_lon_pseudoNA),
n_lat_pseudoNA = sum(is_lat_pseudoNA)
) |>
dt(
caption = glue(
"Cruises with pseudo-NAs ({paste(pseudoNA_values, collapse = ',')}) explicitly set to NA before applying longitude and latitude converted from line, station coordinates (except where line & station are 0, which is also treated as a pseudo-NA)."
),
fname = "ctd_cruises_pseudoNA_lonlat"
) |>
formatCurrency(
c("n_lon_pseudoNA", "n_lat_pseudoNA"),
currency = "",
digits = 0,
mark = ","
)
```
## View Cruise with Excess Distance Points
```{r}
#| label: pts_distance_from_lnst
max_dist_dec_lnst_km <- 10
pts <- d_bind_sp |>
st_as_sf(coords = c("lon_dec", "lat_dec"), remove = F, crs = 4326) |>
mutate(
geom_lnst = purrr::map2(lon_lnst, lat_lnst, \(x, y) st_point(c(x, y))) |>
st_sfc(crs = 4326),
dist_dec_lnst_km = st_distance(geometry, geom_lnst, by_element = T) |>
units::set_units(km) |>
units::drop_units(),
is_dist_dec_lnst_within_max = dist_dec_lnst_km <= max_dist_dec_lnst_km,
) |>
st_join(
calcofi4r::cc_grid |>
select(grid_site = sta_key),
join = st_intersects
)
st_agr(pts) <- "constant"
# visualize a cruise with points exceeding max distance
badcr_id <- "1507OC"
pts_badcr <- pts |>
filter(cruise_id == badcr_id)
bb_badcr <- st_bbox(pts_badcr) |>
st_as_sfc()
mapView(bb_badcr) +
mapView(
pts_badcr |>
filter(is_dist_dec_lnst_within_max) |>
slice_sample(n = 1000) |> # samples up to 1000 per cruise
bind_rows(
pts_badcr |>
filter(!is_dist_dec_lnst_within_max)
) |>
select(
cruise_id,
dtime_utc,
lon_dec,
lat_dec,
sta_id,
line,
sta,
dist_dec_lnst_km,
is_dist_dec_lnst_within_max
),
layer.name = glue(
"Cruise {badcr_id}<br>
distance (km) <br>
lon/lat to line/station"
),
# zcol = "cruise_id",
zcol = "dist_dec_lnst_km",
cex = 5,
alpha = 0.5
)
```
## Filter Points by Distance from Line/Station Locations
```{r}
#| label: filter_pts
# generate a table by cruise_id for n_all, n_within_max, n_exceeding_max, and pct_exceeding_max
d_pts_cruise_filt_smry <- pts |>
st_drop_geometry() |>
group_by(cruise_id) |>
# filter groups without any points exceeding max distance
filter(
any(!is_dist_dec_lnst_within_max)
) |>
summarize(
n_all = n(),
n_outside_grid = sum(is.na(grid_site)),
pct_outside_grid = n_outside_grid / n_all,
n_gt_cutoff = sum(!is_dist_dec_lnst_within_max, na.rm = T),
avg_dist_gt_cutoff_km = if_else(
n_gt_cutoff > 0,
mean(dist_dec_lnst_km[!is_dist_dec_lnst_within_max], na.rm = T),
NA_real_
),
# min_dist_gt_cutoff_km = if_else(
# n_gt_cutoff > 0,
# min(dist_dec_lnst_km[!is_dist_dec_lnst_within_max], na.rm = T),
# NA_real_
# ),
# max_dist_gt_cutoff_km = if_else(
# n_gt_cutoff > 0,
# max(dist_dec_lnst_km[!is_dist_dec_lnst_within_max], na.rm = T),
# NA_real_
# ),
pct_gt_cutoff = n_gt_cutoff / n_all,
n_rm = sum(!is_dist_dec_lnst_within_max | is.na(grid_site), na.rm = T),
pct_rm = n_rm / n_all,
.groups = "drop"
) |>
filter(n_rm > 0) |>
arrange(desc(pct_rm))
d_pts_cruise_filt_smry |> head() |> names()
# OLD: swap points exceeding max distance ----
# pts_swap <- pts |>
# # for points exceeding max distance, swap lon/lat with line/station lon/lat and update geometry to geom_lnst
# mutate(
# lon_dec = if_else(
# !is_dist_dec_lnst_within_max,
# lon_lnst,
# lon_dec
# ),
# lat_dec = if_else(
# !is_dist_dec_lnst_within_max,
# lat_lnst,
# lat_dec
# ),
# geometry = if_else(
# !is_dist_dec_lnst_within_max,
# geom_lnst,
# geometry
# )
# )
# filter points ----
pts_filt <- pts |>
filter(
is_dist_dec_lnst_within_max,
!is.na(grid_site)
)
# write_rds(pts_filt, pts_filt_rds, compress = "gz")
# file_info(pts_filt_rds) |> pull(size) # 684M
# show cruises that had points exceeding max distance
d_pts_cruise_filt_smry |>
dt(
caption = HTML(mark(
text = glue(
"Cruises with rows filtered out that fell outside the CalCOFI grid (`n_outside_grid`) given by [`calcofi4r::cc_grid`](https://calcofi.io/calcofi4r/articles/calcofi4r.html#grid-by-zone) or exceeded a {max_dist_dec_lnst_km} km cutoff from line/station coordinates (`n_gt_cutoff`), sorted by percentage of rows removed (`pct_rm`)."
)
)),
escape = F,
fname = "ctd_cruises_distance_from_lnst"
) |>
formatCurrency(
c(
"n_all",
"n_gt_cutoff",
"avg_dist_gt_cutoff_km",
"n_outside_grid",
"n_rm"
# "min_dist_gt_cutoff_km",
# "max_dist_gt_cutoff_km",
),
currency = "",
digits = 0,
mark = ","
) |>
formatPercentage(
c(
"pct_gt_cutoff",
"pct_outside_grid",
"pct_rm"
),
digits = 2
)
```
## View Filtered Points
```{r}
#| label: view_pts_filt
# pts_filt <- read_rds(pts_filt_rds)
# st_bbox(pts_swap)
# xmin ymin xmax ymax
# -126.52170 29.80726 -115.67477 50.00641
# st_bbox(pts_filt)
# xmin ymin xmax ymax
# -126.48430 29.80726 -117.27340 37.85147
bb_cr <- st_bbox(pts_filt) |>
st_as_sfc()
mapView(bb_cr) +
mapView(
pts_filt |>
group_by(cruise_id) |>
slice_sample(n = 100) |> # samples up to 1000 per cruise
ungroup() |>
select(
cruise_id,
lon_dec,
lat_dec,
sta_id,
line,
sta,
dist_dec_lnst_km,
dtime_utc
),
zcol = "cruise_id",
# zcol = "dist_dec_lnst",
cex = 5,
alpha = 0.5
)
```
```{r}
#| label: show_pts_filt_attr
# show first and last 100 rows
pts_filt |>
st_drop_geometry() |>
select(-geom_lnst) |>
slice(c(1:10, (n() - 9):n())) |>
dt(
caption = glue(
"First and last 10 rows of filter CTD points data (nrows: {format(nrow(pts_filt), big.mark = ',')})"
),
fname = "ctd_filtered_points_sample20"
)
```
### Check for duplicates by datetime and depth
```{r}
#| label: check-dupes
# TODO: look for duplicates by dtime_utc, depth
d_dupes <- pts_filt |>
st_drop_geometry() |>
select(dtime_utc, depth) |>
group_by(dtime_utc, depth) |>
summarize(n = n(), .groups = "drop") |>
filter(n > 1) |>
arrange(desc(n)) # 741,840 × 3
# get number of distinct dtime_utc, depth
n_distinct_dtime_depth <- pts_filt |>
st_drop_geometry() |>
select(dtime_utc, depth) |>
n_distinct() # 5,931,948
d_dupes |>
slice(c(1:10, (n() - 9):n())) |>
dt(
caption = glue(
"First and last 10 rows of duplicates by dtime_utc, depth (nrows = {format(nrow(d_dupes), big.mark = ',')}; n_distinct_dtime_depth = {format(n_distinct_dtime_depth, big.mark = ',')})"
),
fname = "ctd_duplicates_by_datetime_depth_n20"
)
```
## Ingest into DuckDB, Write to `ctd.parquet`
```{r}
#| label: ingest-duckdb
# TODO: pts_filt → parquet
# write to parquet
d_pts_filt <- pts_filt |>
select(-geom_lnst) |>
st_drop_geometry()
dbWriteTable(con, "ctd", d_pts_filt, overwrite = T)
# dbListTables(con)
message(glue("Ingested {nrow(d_pts_filt)} rows into DuckDB table 'ctd'"))
# old: 6,813,712; new: 6,727,991
# write point_geom as geometry column
message(glue(
"Adding spatially indexed column point_geom from lon_dec and lat_dec..."
))
dbExecute(con, "ALTER TABLE ctd ADD COLUMN point_geom GEOMETRY;")
dbExecute(con, "UPDATE ctd SET point_geom = ST_Point(lon_dec, lat_dec);")
dbExecute(
con,
"CREATE INDEX idx_ctd_point_geom ON ctd USING RTREE(point_geom);"
)
# TODO: write ctd as parquet
message(glue(
"Writing DuckDB table 'ctd' to Parquet at {ctd_pq}..."
))
dbExecute(
con,
glue::glue(
"COPY ctd TO '{ctd_pq}' (FORMAT 'parquet', COMPRESSION zstd, OVERWRITE_OR_IGNORE);"
)
)
# PARTITION_BY (cruise_id)
# READ: 'orders/*/*/*.parquet', hive_partitioning = true
# TODO: add key-value metadata
# # KV_METADATA {
# number: 'Answer to life, universe, and everything',
# is_even: 'not ''odd''' -- single quotes in values must be escaped
# }
# SELECT * FROM read_parquet('test.parq')
# disconnect
dbDisconnect(con, shutdown = TRUE)
rm(con)
```
## Check `ctd.parquet` for spatial functionality in DuckDB
```{r}
#| label: check-parquet
# Create new database connection
con <- dbConnect(duckdb::duckdb()) # dbListTables(con)
# Load spatial extension
dbExecute(con, "INSTALL spatial;")
dbExecute(con, "LOAD spatial;")
# Read parquet file into DuckDB
message(glue("Reading parquet file from {ctd_pq}..."))
dbExecute(
con,
glue("CREATE VIEW ctd AS SELECT * FROM read_parquet('{ctd_pq}');")
)
# Verify table structure and row count
row_count <- dbGetQuery(con, "SELECT COUNT(*) as n FROM ctd")$n
message(glue("Reading {row_count} rows from parquet"))
# Check column names and types
cols <- dbGetQuery(con, "DESCRIBE ctd;")
print(cols)
# Test 1: Verify geometry column exists and is valid
message("\n=== Testing geometry column ===")
geom_test <- dbGetQuery(
con,
"
SELECT
COUNT(*) as total_rows,
COUNT(point_geom) as non_null_geoms,
COUNT(CASE WHEN ST_IsValid(point_geom) THEN 1 END) as valid_geoms
FROM ctd;
"
)
print(geom_test)
# Test 2: Get spatial extent for all rows
message("\n=== Getting spatial extent ===")
extent <- dbGetQuery(
con,
"
SELECT
MIN(ST_X(point_geom)) as min_lon,
MAX(ST_X(point_geom)) as max_lon,
MIN(ST_Y(point_geom)) as min_lat,
MAX(ST_Y(point_geom)) as max_lat,
COUNT(*) as total_points
FROM ctd;
"
)
print(extent)
# Test 3: Sample some geometries and extract coordinates
message("\n=== Sampling geometries ===")
sample_geoms <- dbGetQuery(
con,
"
SELECT
ST_X(point_geom) as lon,
ST_Y(point_geom) as lat,
ST_AsText(point_geom) as wkt
FROM ctd
LIMIT 5;
"
)
print(sample_geoms)
# Test 4: Spatial query - find points within a bounding box
# (adjust coordinates to match your data extent)
message("\n=== Testing spatial filter ===")
bbox_query <- dbGetQuery(
con,
"
SELECT COUNT(*) as points_in_bbox
FROM ctd
WHERE ST_Within(
point_geom,
ST_MakeEnvelope(-125.0, 32.0, -117.0, 35.0)
);
"
)
print(bbox_query)
# Test 5: Distance calculation - find points near a location
# (adjust reference point coordinates as needed)
message("\n=== Testing distance calculation ===")
distance_test <- dbGetQuery(
con,
"
SELECT
ST_X(point_geom) as lon,
ST_Y(point_geom) as lat,
ST_Distance_Sphere(
point_geom,
ST_Point(-120.0, 34.0)::GEOMETRY
) as distance_m
FROM ctd
ORDER BY distance_m
LIMIT 5;
"
)
print(distance_test)
# Test 6: Verify spatial index exists (if recreating)
message("\n=== Checking indexes ===")
indexes <- dbGetQuery(
con,
"
SELECT * FROM duckdb_indexes()
WHERE table_name = 'ctd';
"
)
print(indexes)
# Optional: Recreate spatial index if needed
# dbExecute(con, "DROP INDEX IF EXISTS idx_ctd_point_geom;")
# dbExecute(con, "CREATE INDEX idx_ctd_point_geom ON ctd USING RTREE(point_geom);")
# Test 7: Read into R as sf object
message("\n=== Reading as sf object ===")
sf_ctd <- st_read(
con,
query = "SELECT * EXCLUDE (point_geom), ST_AsWKB(point_geom) AS geom FROM ctd USING SAMPLE 50;",
geometry_column = "geom"
) |>
st_set_crs(4326)
mapView(sf_ctd)
print(sf_ctd)
# Cleanup
# dbDisconnect(con, shutdown = TRUE)
```
## Interactive Gantt Chart
```{r}
#| label: gantt-chart
# reconnect to database
# con <- dbConnect(duckdb::duckdb(), dbdir = db_path, read_only = T)
# tbl(con, "ctd") |>
# summarize(n = n())
# 210,507
# tbl(con, "ctd") |>
# filter(study == "2301RL") |>
# distinct(study) |>
# pull(study) |>
# sort()
# 20 - 2401
# RL_CTDBTL_017 - 033
# U.csv
# tbl(con, "ctd") |>
# pull(study) |>
# table(useNA = "ifany")
# 9802JD 9804JD
# 59,005 151,502
# get cruise date spans by cruise and stage
cruise_spans <- tbl(con, "ctd") |>
# rename(data_stage = data_type) |> # DEBUG
group_by(cruise_id, data_stage) |>
summarize(
beg_date = min(dtime_utc, na.rm = TRUE) |> as.Date(),
end_date = max(dtime_utc, na.rm = TRUE) |> as.Date(),
.groups = "drop"
) |>
collect() |>
filter(!is.na(beg_date), !is.na(end_date))
# Split multi-year cruises into separate rows
cruise_spans <- cruise_spans |>
rowwise() |>
mutate(
years_spanned = list(year(beg_date):year(end_date))
) |>
unnest(years_spanned) |>
mutate(
# Adjust dates for each year segment
year = years_spanned,
adj_beg_date = if_else(
years_spanned == year(beg_date),
beg_date,
as.Date(paste0(years_spanned, "-01-01"))
),
adj_end_date = if_else(
years_spanned == year(end_date),
end_date,
as.Date(paste0(years_spanned, "-12-31"))
),
begin_jday = yday(adj_beg_date),
end_jday = yday(adj_end_date),
stage_label = data_stage,
hover_text = glue(
"Cruise: {cruise_id}<br>",
"Stage: {data_stage}<br>",
"Begin: {format(beg_date, '%Y-%m-%d')}<br>",
"End: {format(end_date, '%Y-%m-%d')}<br>",
"Duration: {as.numeric(difftime(end_date, beg_date, units = 'days'))} days"
)
) |>
ungroup() |>
arrange(adj_beg_date)
# create color mapping
colors <- c("final" = "#23d355ff", "preliminary" = "#A23B72")
# create interactive gantt chart
p <- plot_ly()
# add traces for each cruise segment
for (i in 1:nrow(cruise_spans)) {
row <- cruise_spans[i, ]
p <- p |>
add_trace(
type = "scatter",
mode = "lines",
x = c(row$begin_jday, row$end_jday),
y = c(row$year, row$year),
line = list(
color = colors[row$stage_label],
width = 8
),
text = row$hover_text,
hoverinfo = "text",
showlegend = FALSE,
legendgroup = row$stage_label
)
}
# add legend traces
p <- p |>
add_trace(
type = "scatter",
mode = "lines",
x = c(NA, NA),
y = c(NA, NA),
line = list(color = colors["final"], width = 8),
name = "Final",
showlegend = TRUE
) |>
add_trace(
type = "scatter",
mode = "lines",
x = c(NA, NA),
y = c(NA, NA),
line = list(color = colors["preliminary"], width = 8),
name = "Preliminary",
showlegend = TRUE
)
# customize layout
p <- p |>
layout(
title = "CalCOFI CTD Cruise Timeline",
xaxis = list(
title = "Day of Year",
range = c(1, 365),
dtick = 30,
tickangle = 0
),
yaxis = list(
title = "Year",
autorange = "reversed",
dtick = 1
),
hovermode = "closest",
plot_bgcolor = "#f8f9fa",
paper_bgcolor = "white",
legend = list(
x = 1.02,
y = 1,
xanchor = "left",
yanchor = "top"
)
)
# display chart
p
# disconnect
dbDisconnect(con, shutdown = TRUE) # get cruise date spans by cruise and stage
```
## Notes (old)
### Process raw data
into usable form:
- [Fathom software \| Sea-Bird Scientific](https://software.seabird.com/)
References:
- [CTD Data \| CalCOFI archives](https://newdata.calcofi.com/index.php/ctd-data)
> **Cast Files**. Files are created by SeaBird's Seasave program during each CalCOFI station. These data are unaltered and may contain errors (e.g., incorrect header information) or be incomplete due to system failure during collection. Two or more files may be available for a single station in the latter case. Each cruise will have a single cast data file. This file is a compressed .zip containing .zip files for each cast of the cruise, yymm_CTDCast.zip and yymm_llllssss\_###.zip respectively. Each station .zip should contain the following: .bl, .con, .hdr, .hex, .mrk, and .prn files.
::: {.callout-note collapse="true"}
#### CalCOFI CTD System
- A Sea-Bird Electronics 911plus V2 CTD collects vertical profile data at every CalCOFI station.
- In addition to being a dual TCO (temperature, conductivity, and oxygen) system, the CTD also interfaces with a transmissometer, fluorometer, PAR/SPAR meters, altimeter, nitrate, and pH sensor.
- Connected to a shipboard data-acquisition computer through an electronically-conductive winch wire, sensor data are collected and displayed real-time using Seasave V7.
- The CTD is normally lowered to terminal depth of 515 m, bottom-depth permitting, but is routinely deployed within meters from the seafloor at nearshore SCCOOS and basin stations.
- To ensure high resolution sampling in areas with significant hydrological and biological gradients a speed of \~30 m/min is used for the first 100 m then \~60 m/min to depth without stopping.
- During retrieval, the CTD is paused for at least 20 seconds at target bottle depths to adequately flush each 10 liter sample bottle prior to closure. Seawater samples are analyzed onboard (e.g., salinity, oxygen, nutrients, chl) and are used to correct measured CTD values.
:::
- Mahmoud & Mas (2022) [Standard Operating Procedure: CTD data pre-processing](https://repository.oceanbestpractices.org/handle/11329/2081). *Ocean Best Practices*
```
@article{mohamed2022standard,
title={Standard Operating Procedure: CTD data pre-processing.},
author={Mohamed Mahmoud, Mohamed and Munoz Mas, Cristian},
year={2022},
publisher={Institut Mauritanien De Recherches Oc{\'e}anographiques Et De P{\^e}ches (IMROP)}
}
```
### Evaluate different zip files
- 2025-Jul:
- Preliminary CTD 1m-Binned\
[20-2507SR_CTD**Prelim**.zip](https://calcofi.org/downloads/2025/20-2507SR_CTDPrelim.zip)\
**Preliminary CTD 1m-Binned**: Raw CTD sensor data that have gone through SBE Data Processing, and obvious bad sensor readings have been flagged during initial QAQC. Bottle data has not yet been merged with the CTD data. CTD Salinity, Oxygen, Fluorescence, and Nitrate profiles have not been corrected to the bottle data. Cruise corrected Nitrate sensor data (EstNO3_CruiseCorr) has been converted from raw voltage using a sensor calibration done before each cruise, but not yet corrected using CTD vs. Bottle regressions.
- 2021-Jul:
- ~~Prodo Cast~~ Raw CTD Data\
[20-2107SR_CTD**Cast**.zip](https://calcofi.org/downloads/2021/20-2107SR_CTDCast.zip)
- Preliminary CTD & Bottle 1m-Binned\
[20-2107SR_CTD**Prelim**.zip](https://calcofi.org/downloads/2021/20-2107SR_CTDPrelim.zip)\
**Preliminary CTD & Bottle 1m-Binned**: Preliminary CTD sensor data that have been merged with QAQC’ed bottle data. CTD Salinity, Oxygen, Fluorescence, and Nitrate profiles have been corrected using CTD vs. bottle regressions.
- 2021-May:
- Raw CTD Data\
[20-2105SH_CTD**Cast**.zip](https://calcofi.org/downloads/2021/20-2105SH_CTDCast.zip)\
**Raw CTD Data**: Data files (.xmlcon, .hex, .bl, .nav, .prn, .mrk, .hdr) directly from SBE Software ([Sea-Bird Scientific](https://www.seabird.com/)) with no processing or corrections applied.
- Final 1m-Binned\
[20-2105SH_CTD**FinalQC**.zip](https://calcofi.org/downloads/2021/20-2105SH_CTDFinalQC.zip)\
**Final 1m-Binned**: CTD data that have passed through the CalCOFI quality control process. After standard Seasoft-processing (Sea-Bird Scientific’s data-processing suite) tuned for our SBE 911plus CTD, the .asc files are processed by SIO-CalCOFI in-house software (BtlVsCTD). Averaged (4-sec or 1-meter bin-averaged) CTD sensor data are matched with corresponding bottle data, then **cruise-corrected** and **station-corrected**. Zipped file includes: metadata and .asc, .hdr, .btl, and upcast & downcast .csvs (“u” and “d”; by station and concatenated).
- **Cruise-corrected CTD data**: CTD sensor data that are corrected using regression coefficients derived from 4-sec average sensor data vs bottle data comparisons for the entire cruise, meaning all casts with bottle samples (n=\~1400; fliers omitted; column labels ‘\_CruiseCorr’).
- **Station-corrected CTD data**: CTD sensor data that are corrected using regression coefficients generated dynamically for each cast, 1m bin-averaged sensor data vs bottle data (n=\~20; fliers omitted; column labels ‘\_StaCorr’).
- Since sensor behavior may vary from station to station, station-corrected CTD sensor data for salinity, oxygen, estimated nitrate, and estimated chlorophyll-a are considered the best, particularly for estimated nitrate.
- 1992-Feb:
- Prodo Cast\
[19-9202JD_CTD**CastProdoStas**.zip](https://calcofi.org/downloads/1992/19-9202JD_CTDCastProdoStas.zip)\
**Test Cast/Prodo Cast**: The Seabird 911plus CTD was first tested in Mar 1990 (9003JD) but is still under development. CTD data from Mar 1990 – 1991 were occasional system & feasibility tests (“**test casts**“). As confidence in data quality improved, the CTD-Rosette began being used for daily primary productivity sample collection casts (“**prodo casts**“) in 1992 (6 depths from one station per day).
- 1990-Mar:
- Test Cast\
[19-9003JD_CTD**Test**.zip](https://calcofi.org/downloads/1990/19-9003JD_CTDTest.zip)