Skip to content

Using the system

Adding a new pipeline of a Socrata Data Asset

The Socrata platform is a wealth of public data and this system only requires a few manual steps to set up an ELT pipeline that adds a data set to your local warehouse.

Manual Steps

  1. Add a SocrataTable instance with the data set's table_id and table_name to the /airflow/dags/sources/tables.py file as shown here

  2. Copy this DAG into a file anywhere in /airflow/dags/ and edit the 4 annotated lines.

  3. [In the Airflow Web UI] Run that new DAG.

  4. (Optional) To check data quality before updating your local table, set expectations for the data.

  5. (Optional) To standardize column names, dtypes, or order for a data set, edit the file named {table_name}_standardized.sql in directory /airflow/dbt/models/standardized/.

Workflow Overview

The workflow for producing usable tables follows this pattern:

  1. (data_raw schema): Set up an ingestion pipeline.

    1.1. Extract data to a local file.

    1.2. Load that data into a "temp" table.

    1.3. Select distinct records that aren't already in the warehouse and add them to a persistant table.

    1.4. Define a suite of expectations to validate future data updates.

  2. (standardize schema): Implement dbt models that standardize the data set's columns.

    2.1. Standardize column [names, dtypes, order] and perform cleaning steps in the f"{data_set_name}_standardized.sql dbt model file.

  3. (clean schema): Automatically generates dbt models that implement a deduplication strategy, produce a clean data set.

  4. (feature schema): Implement dbt models to engineer data features.

    3.1. Engineer desired features

  5. (dwh schema): Implement dbt models to assemble data into analytically useful tables.

For tables hosted by Socrata, this system reduces steps 1.1 through 1.3 to a 3 minute operation, generates a nearly ready ..._standardized.sql stub for 2.1, and automatically produces the ..._clean.sql file from 2.2 after the ..._standardized.sql stub is edited.

check_table_metadata TaskGroup

Census Data Sources

Adding a new pipelines for a Census API Data Set

Note

This workflow is under active development and may change as new functionality is added.

  1. Retrieve or refresh the catalog of Census API Datasets by running the refresh_census_api_metadata DAG.

  2. Identify a dataset of interest in the metadata.census_api_metadata table in the re_dwh database.

  3. Collect metadata on the dataset of interest's variables and geographies.

    3.1. Copy the identifier url for a dataset of interest.

    3.2. Paste that identifier url into the DATASET_IDENTIFIER variable near the top of the DAG file /airflow/dags/refresh_census_api_dataset_variables_metadata.py

    3.3. Run the refresh_census_api_dataset_variables_metadata DAG

  4. To be developed