Skip to main content

Traffic Data Pipeline and Warehouse

Tesfaye writes about deep learning architectures and other computer programming concepts.

Can a data pipeline help to improve city traffic flow?

Can a data pipeline help to improve city traffic flow?

Building a Data Pipeline

This project aims to build a data pipeline and warehouse for traffic data collected
from several locations in a city. We will use the Extract Load Transform (ELT)
framework, integrating our DBT (data build tool) as well.

Project Objective

A city traffic department wants to collect traffic data using swarm UAVs (drones)
from a number of locations in the city and use the data to improve traffic flow in the city, as well as a number of other undisclosed projects.

Our task is to build a scalable data warehouse that will host the vehicle trajectory data extracted by analyzing footage taken by swarm drones and static roadside cameras. The data warehouse should take into account future needs, and organize data such that a number of downstream projects may query the data efficiently.

Project Tools

  • Apache Airflow - A workflow manager to schedule, orchestrate & monitor workflows. Directed acyclic graphs (DAG) are used by Airflow to control workflow orchestration.
  • Postgresql - An object-relational database management system (ORDBMS) with an emphasis on extensibility and standards compliance. It is used as the primary data store or data warehouse for many webs, mobile, geospatial, and analytics applications.
  • DBT (data build tool) - Enables transforming data in warehouses by simply writing select statements. It handles turning these select statements into tables and views.
  • Redash - An open-source web application used for clearing databases and visualizing the results.

Given Data

The data we will be using for this project can be downloaded from pNEUMA data. pNEUMA is an open large-scale dataset of naturalistic trajectories from half a million vehicles in the congested downtown area of Athens, Greece. The one-of-a-kind experiment used a swarm of drones to collect the data. Each file for a single (area, date, time) is ~87MB of data.

Workflow of the project

Workflow of the project

Project Steps

As you can see on the diagram, the project requires the following steps:

  1. Loading the data to the database. We will load the downloaded data to Postgresql using SQL query.
  2. Writing Airflow DAGs for automating the data loading. We will write a Python Airflow script to schedule and run data loading automatically.
  3. Transform data using DBT. We will write model schema and SQL queries to transform the raw data loaded on the database.
  4. Visualize the data using Redash. We will visualize the models and the tables created from the DBT transformation.

Data Loading and Airflow DAGS

The data we have has a column length of more than 121,000, which means inserting the data into the database would be hard, time-taking, and not efficient. For this reason, I compiled all columns after the time column into one separated by "_", which would allow the loading of the data to be simple and efficient for now.

  1. Modify the data into the data that is appropriate.
  2. Write SQL files to create tables and load data from CSV file.
  3. Write DAGs by sequencing and adding operators to load the data.
  4. Run the DAGs to view the progress and job runs.

create_raw_data.sql

CREATE TABLE IF NOT EXISTS Warehouse.raw_data
(
    track_id bigint,
    " type" text COLLATE pg_catalog."default",
    " traveled_d" double precision,
    " avg_speed" double precision,
    " lat" double precision,
    " lon" double precision,
    " speed" double precision,
    " lon_acc" double precision,
    " lat_acc" double precision,
    "time" double precision,
    other_data text COLLATE pg_catalog."default"
)

load_raw_data.sql

COPY Warehouse.raw_data FROM '../data/transformed_data' WITH DELIMITER AS ';' NULL AS '\null' CSV HEADER;

loader_dag.py

def modify_raw_data(location):
    updated_lines=""
    with open(location, 'r', encoding='ISO-8859-1') as f:
            lines = f.readlines()
            for index , line in enumerate(lines):
                if(index == 0):
                    data = line 
                each_line = line.split(';')
                if index != 0:
                    updated_lines += ";".join(each_line[0:10]) + ";" + "_".join(each_line[10:])
                else:
                    updated_lines += ";".join(each_line[:len(each_line)-1]) + ";" + "time" + ";" + "other_data" + "\n" 
    with open('/data/transformed_data', "w") as f:
        f.writelines(updated_lines)


with DAG(
    dag_id='load_data',
    default_args=default_args,
    description='extract and load raw data from the given dataset',
    start_date=datetime(2022,7,6,2),
    schedule_interval='@once'
)as dag:
    task1 = PostgresOperator(
        task_id='change_raw_file',
        python_callable=modify_raw_data,
        op_kwargs={'location':"/data/dataset.csv"}
    )
    task2 = PostgresOperator(
        task_id='create_dataset_table',
        postgres_conn_id='postgres_connection',
        sql='/sql/create_raw_data.sql',
    )
    task3 = PostgresOperator(
        task_id='load_dataset',
        postgres_conn_id='postgres_connection',
        sql='/sql/load_raw_data.sql',
    )

    task1 >> task2 >> task3

The above code enables to schedule table modification and create and load the SQL, which will give the following DABs on Airflow webserver.

traffic-data-pipeline-and-warehouse
Scroll to Continue
traffic-data-pipeline-and-warehouse
traffic-data-pipeline-and-warehouse

Writing DBT Model and Schema

After loading the data in Postgres, the next step to do is to transform the data to create other tables and views based on the main dataset.

  1. Write a schema of our main dataset.
  2. Write the main transformed view/table to continue transforming on that model.
  3. Write sub-models referencing the main transformed view/table.

Schema.yml

version: 2

models:
  - name: datasets
    description: "traffic data"
    columns:
      - name: id
        description: "unique id"
      - name: track_id
        description: "track id"
      - name: type
        description: "type"
      - name: traveled_d
        description: "traveled_d"
      - name: avg_speed
        description: "avg_speed"
      - name: lat
        description: "lat"
      - name: lon
        description: "lon"
      - name: speed
        description: "speed"
      - name: lon_acc
        description: "lon_acc"
      - name: lat_acc
        description: "lat_acc"
      - name: time
        description: "time"

First DBT Model to Take Only the 10 Known Columns

{{ config(materialized='view') }}

with traffic_dbt_model as (

    select * from datasets

)

select *
from traffic_dbt_model

DBT Model to Group by Type With Average Speed

-- Use the `ref` function to select from other models

select " type" , AVG(" avg_speed") 
from {{ ref('traffic_dbt_model') }}
Group by " type"

Those DBT codes would create multiple views transforming the data. After this DBT would enable us to document our models and serve them locally.

traffic-data-pipeline-and-warehouse
traffic-data-pipeline-and-warehouse

Creating a Dashboard With Redash

For this project, we can use Redash to create a dashboard and look at the views we made with DBT. Below is the Redash development environment as a sample.

traffic-data-pipeline-and-warehouse

Challenges Faced Implementing the Project

Challenges faced on this project include:

  • The data size and shape - were massive data and didn't come as rectangular as needed. so that caused some problems but I was able to implement loading the data by modifying the raw_data
  • Redash connection to localhost - as Redash is mostly installed using docker, it gave me a hardtime to connect to my localhost Postgres.

Future Plans and Conclusion

This project has been a good learning curve for me, but there are some tasks I wish to add to this project, which include:

  • Add more transformation on DBT to gain more insights.
  • Build a more complete Redash dashboard.

This article is accurate and true to the best of the author’s knowledge. Content is for informational or entertainment purposes only and does not substitute for personal counsel or professional advice in business, financial, legal, or technical matters.

Related Articles