Using the system¶
Adding a new pipeline of a Socrata Data Asset¶
The Socrata platform is a wealth of public data and this system only requires a few manual steps to set up an ELT pipeline that adds a data set to your local warehouse.
Manual Steps¶
-
Add a
SocrataTable
instance with the data set's table_id and table_name to the/airflow/dags/sources/tables.py
file as shown here -
Copy this DAG into a file anywhere in
/airflow/dags/
and edit the 4 annotated lines. -
[In the Airflow Web UI] Run that new DAG.
-
(Optional) To check data quality before updating your local table, set expectations for the data.
-
(Optional) To standardize column names, dtypes, or order for a data set, edit the file named
{table_name}_standardized.sql
in directory/airflow/dbt/models/standardized/
.
Workflow Overview¶
The workflow for producing usable tables follows this pattern:
-
(
data_raw
schema): Set up an ingestion pipeline.1.1. Extract data to a local file.
1.2. Load that data into a "temp" table.
1.3. Select distinct records that aren't already in the warehouse and add them to a persistant table.
1.4. Define a suite of expectations to validate future data updates.
-
(
standardize
schema): Implement dbt models that standardize the data set's columns.2.1. Standardize column [names, dtypes, order] and perform cleaning steps in the
f"{data_set_name}_standardized.sql
dbt model file. -
(
clean
schema): Automatically generates dbt models that implement a deduplication strategy, produce a clean data set. -
(
feature
schema): Implement dbt models to engineer data features. -
(
dwh
schema): Implement dbt models to assemble data into analytically useful tables.
For tables hosted by Socrata, this system reduces steps 1.1 through 1.3 to a 3 minute operation, generates a nearly ready ..._standardized.sql
stub for 2.1, and automatically produces the ..._clean.sql
file from 2.2 after the ..._standardized.sql
stub is edited.
Census Data Sources¶
Adding a new pipelines for a Census API Data Set¶
Note
This workflow is under active development and may change as new functionality is added.
-
Retrieve or refresh the catalog of Census API Datasets by running the
refresh_census_api_metadata
DAG. -
Identify a dataset of interest in the
metadata.census_api_metadata
table in there_dwh
database. -
Collect metadata on the dataset of interest's variables and geographies.
3.1. Copy the
identifier
url for a dataset of interest.3.2. Paste that
identifier
url into theDATASET_IDENTIFIER
variable near the top of the DAG file/airflow/dags/refresh_census_api_dataset_variables_metadata.py
3.3. Run the
refresh_census_api_dataset_variables_metadata
DAG -
To be developed