Goal: Generate the database from source files with workflow scripts to make updating easier and provenance fully transparent. This allows us to:
Rename tables and column names, control data types and use Unicode encoding for a consistent database ingestion strategy, per Database naming conventions in Database – CalCOFI.io Docs.
Differentiate between raw and derived or updated tables and columns. For instance, the taxonomy for any given species can change over time, such as lumping or splitting of a given taxa, and by taxonomic authority (e.g., WoRMS, ITIS or GBIF). These taxonomic identifiers and the full taxonomic hierarchy should get regularly updated regardless of source observational data, and can either be updated in the table directly or joined one-to-one with a seperate table in a materialized view (so as not to slow down queries with a regular view).
This workflow processes NOAA CalCOFI database CSV files and updates the PostgreSQL database. The workflow:
Reads CSV files from source directory
Compares with lookup tables for field descriptions
Initializes or updates database tables
Generates summary statistics
Code
graph TD A[Read CSV Files] --> B[Extract Column Names] B --> C[Compare with Lookups] C --> D[Process Field Descriptions] D --> E{Table Exists?} E -->|Yes| F[Update Table] E -->|No| G[Initialize Table] F --> H[Summary Stats] G --> H
graph TD
A[Read CSV Files] --> B[Extract Column Names]
B --> C[Compare with Lookups]
C --> D[Process Field Descriptions]
D --> E{Table Exists?}
E -->|Yes| F[Update Table]
E -->|No| G[Initialize Table]
F --> H[Summary Stats]
G --> H
Overview diagram of CSV ingestion process into the database.
# display interactive table of changesif (nrow(changes$summary) >0) {display_csv_changes(changes, format ="DT", title ="CSV vs Redefinition Mismatches")}
Code
# check if there are any mismatches that need to be resolvedif (nrow(changes$summary) >0) {# disable evaluation of remaining chunks knitr::opts_chunk$set(eval =FALSE)# output markdown warningcat(glue(" ## ⚠️ Data Integrity Check Failed ### Workflow Halted Mismatches have been detected between the CSV files and redefinition metadata. These must be resolved before proceeding with database ingestion to ensure data integrity. ### Required Actions Please review the changes detected above and update the following redefinition files: - **Tables redefinition**: `{d$paths$tbls_rd_csv}` - **Fields redefinition**: `{d$paths$flds_rd_csv}` ### Common Resolutions 1. **New tables/fields in CSV**: Add them to the appropriate redefinition file 2. **Removed tables/fields from CSV**: Remove obsolete entries from redefinition files 3. **Type mismatches**: Update field types in redefinition files to match CSV data types 4. **Field name changes**: Update `fld_old` entries to match current CSV field names ### Next Steps After updating the redefinition files, re-run this workflow. The remaining code chunks have been disabled and will not execute until all mismatches are resolved. --- *Note: The remainder of this document contains code that will not be executed due to data integrity issues.* "))} else {# no mismatches found - enable evaluation and show success message knitr::opts_chunk$set(eval =TRUE)cat(glue(" ## ✅ Data Integrity Check Passed ### All Systems Go No mismatches were found between the CSV files and redefinition metadata. The data structures are properly aligned and ready for database ingestion. ### Proceeding with Workflow The workflow will now continue with the following steps: 1. Display CSV file metadata from Google Drive 2. Show tables and fields to be ingested 3. Apply data transformations based on redefinitions 4. Load transformed data into the database 5. Create indexes and relationships 6. Add spatial data and generate reports --- "))}
1 ⚠️ Data Integrity Check Failed
1.1 Workflow Halted
Mismatches have been detected between the CSV files and redefinition metadata. These must be resolved before proceeding with database ingestion to ensure data integrity.
1.2 Required Actions
Please review the changes detected above and update the following redefinition files:
New tables/fields in CSV: Add them to the appropriate redefinition file
Removed tables/fields from CSV: Remove obsolete entries from redefinition files
Type mismatches: Update field types in redefinition files to match CSV data types
Field name changes: Update fld_old entries to match current CSV field names
1.4 Next Steps
After updating the redefinition files, re-run this workflow. The remaining code chunks have been disabled and will not execute until all mismatches are resolved.
Note: The remainder of this document contains code that will not be executed due to data integrity issues.
1.5 Show CSV Files on Google Drive
Code
show_googledrive_files(d)
1.6 Show CSV Tables and Fields to Ingest
Code
d$d_csv$tables |>datatable(caption ="Tables to ingest.")
Code
d$d_csv$fields |>datatable(caption ="Fields to ingest.")
2 Show tables and fields redefined
Code
show_tables_redefine(d)
Code
show_fields_redefine(d)
3 Apply remappings to data
Code
# Use calcofi4db to transform datatransformed_data <-transform_data(d)# For backward compatibilityd <- transformed_data
4 Load Tables into Database
Code
schema <-"dev"overwrite <- T# Use calcofi4db to ingest tables and generate metadata# First, check for changes (this is now done above)# changes <- detect_csv_changes(# con = con,# schema = schema,# transformed_data = transformed_data,# d_flds_rd = d$d_flds_rd# )# Display changesif (length(changes$new_tables) >0) {message("New tables to be added:")message(paste(" -", changes$new_tables, collapse ="\n"))}if (length(changes$field_changes) >0) {message("\nTables with field changes:")for (tbl innames(changes$field_changes)) {message(glue(" - {tbl}:"))if (length(changes$field_changes[[tbl]]$added) >0) {message(glue(" Added: {paste(changes$field_changes[[tbl]]$added, collapse = ', ')}")) }if (length(changes$field_changes[[tbl]]$removed) >0) {message(glue(" Removed: {paste(changes$field_changes[[tbl]]$removed, collapse = ', ')}")) } }}if (length(changes$type_changes) >0) {message("\nTables with type changes:")for (tbl innames(changes$type_changes)) {message(glue(" - {tbl}:"))for (fld innames(changes$type_changes[[tbl]])) {message(glue(" {fld}: {changes$type_changes[[tbl]][[fld]]$from} -> {changes$type_changes[[tbl]][[fld]]$to}")) } }}# Ingest data to databasetbl_stats <-ingest_csv_to_db(con = con,schema = schema,transformed_data = transformed_data,d_flds_rd = d$d_flds_rd,d_gdir_data = d$d_gdata,workflow_info = d$workflow_info,overwrite = overwrite)# Display summary statisticstbl_stats |>datatable(rownames =FALSE, filter ="top")
Error: Failed to fetch row : ERROR: insert or update on table "larva" violates foreign key constraint "larva_sp_id_fkey"
DETAIL: Key (sp_id)=(3023) is not present in table "species".