Using the system¶
Adding a new pipeline of a Socrata Data Asset¶
The Socrata platform is a wealth of public data and this system only requires a few manual steps to set up an ELT pipeline that adds a data set to your local warehouse.
Manual Steps¶
-
Add a
SocrataTableinstance with the data set's table_id and table_name to the/airflow/dags/sources/tables.pyfile as shown here -
Copy this DAG into a file anywhere in
/airflow/dags/and edit the 4 annotated lines. -
[In the Airflow Web UI] Run that new DAG.
-
(Optional) To check data quality before updating your local table, set expectations for the data.
-
(Optional) To standardize column names, dtypes, or order for a data set, edit the file named
{table_name}_standardized.sqlin directory/airflow/dbt/models/standardized/.
Workflow Overview¶
The workflow for producing usable tables follows this pattern:
-
(
data_rawschema): Set up an ingestion pipeline.1.1. Extract data to a local file.
1.2. Load that data into a "temp" table.
1.3. Select distinct records that aren't already in the warehouse and add them to a persistant table.
1.4. Define a suite of expectations to validate future data updates.
-
(
standardizeschema): Implement dbt models that standardize the data set's columns.2.1. Standardize column [names, dtypes, order] and perform cleaning steps in the
f"{data_set_name}_standardized.sqldbt model file. -
(
cleanschema): Automatically generates dbt models that implement a deduplication strategy, produce a clean data set. -
(
featureschema): Implement dbt models to engineer data features. -
(
dwhschema): Implement dbt models to assemble data into analytically useful tables.
For tables hosted by Socrata, this system reduces steps 1.1 through 1.3 to a 3 minute operation, generates a nearly ready ..._standardized.sql stub for 2.1, and automatically produces the ..._clean.sql file from 2.2 after the ..._standardized.sql stub is edited.
Census Data Sources¶
Adding a new pipelines for a Census API Data Set¶
Note
This workflow is under active development and may change as new functionality is added.
-
Retrieve or refresh the catalog of Census API Datasets by running the
refresh_census_api_metadataDAG. -
Identify a dataset of interest in the
metadata.census_api_metadatatable in there_dwhdatabase. -
Collect metadata on the dataset of interest's variables and geographies.
3.1. Copy the
identifierurl for a dataset of interest.3.2. Paste that
identifierurl into theDATASET_IDENTIFIERvariable near the top of the DAG file/airflow/dags/refresh_census_api_dataset_variables_metadata.py3.3. Run the
refresh_census_api_dataset_variables_metadataDAG -
To be developed
