Skip to main content

Traffic Data ELT Pipeline Migration

Tesfaye writes about projects involving deep learning architecture and decentralized technologies like blockchain and NFTs.

Data migration project workflow

Data migration project workflow

Project Overview

Previously, we built an ELT data warehouse using traffic data. As the project continues using the PostgreSQL database and Redash, we will also be migrating the data from my PostgreSQL data warehouse to MySQL data warehouse and automating the entire task. Moreover, I will change the dashboard visualization tool from Redash to Apache subset.

Project Objective

The objective of this project lies in understanding and applying changes and automation with regard to ELT Pipeline and data warehouse. In the previous project, we built a data warehouse using PostgreSQL, Airflow, DBT, and Redash.

This time around, we aim to create and manage the automation process of the data warehouse as data migrates into Mysql, Airflow, DBT, and Apache superset. This project would be helpful to data engineers or anyone performing data migration, changes, and automation processes.

Tools Used

  • Apache Airflow — A workflow manager to schedule, orchestrate and monitor workflows. Directed acyclic graphs (DAG) are used by Airflow to control workflow orchestration.
  • Postgresql — An object-relational database management system (ORDBMS) with an emphasis on extensibility and standards compliance. This is the database which we will be migrating data from.
  • DBT (data build tool) — Enables transforming data in warehouses by simply writing select statements. It handles turning these select statements into tables and views.
  • Redash — An open-source web application used for clearing databases and visualizing the results. This is the dashboard builder tool we will be migrating from.
  • Apache Superset — An open-source data visualization and data exploration platform. It is fast, lightweight, intuitive, and loaded with options that make it easy for users of all skill sets to explore and visualize their data, from simple line charts to highly detailed geospatial charts.

Used Data

The data we will be using for this project can be downloaded from pNEUMA data. pNEUMA is an open large-scale dataset of naturalistic trajectories from half a million vehicles in the congested downtown area of Athens, Greece. The one-of-a-kind experiment used a swarm of drones to collect the data. Each file for a single (area, date, time) is ~87MB of data. This data have been stored in the PostgreSQL database ( Look it up in my last article ).

Project Phases

In this project, we will be using PostgreSQL as our old database and migrating it into MySQL database. Then we will enable transformation using DBT. Last but not least, we will build a dashboard.

  1. Prepare PostgreSQL data warehouse (which we already have)
  2. Write Airflow Dags that will migrate data from PostgreSQL to MySQL
  3. Write DBT codes to enable transformation
  4. Create dashboards using Apache Superset

Writing Airflow Dags

At this phase, we already have the data, so the main part of the project—migrating from a database to a different database—can become difficult. But here I will show how I was able to migrate my data from Postgres to MySQL.

This code will enable us to get all schema names with their table name from a database name parameter.

Migration Utility Codes

def convert_type(type):
       if  type == "boolean":
              return "bit(1)"
       elif type == "integer":
              return  "int"
       elif type == "double precision":
              return "double"
       elif type == "character varying":
              return "LONGTEXT"
       elif type == "text":
              return "LONGTEXT"
       else:
              return type
def create_table_query(table,columns):
       create_query = f"USE {table[0]}; CREATE TABLE IF NOT EXISTS {table[1]}("
       for index , c in enumerate(columns):
        data_type = convert_type(c[2])
        nullable = "NULL" if c[1] == "YES" else "NOT NULL"
        if index != 0:
              create_query += ','
        create_query +=   "`" +  str(c[0]) + "`" + " "  + str(data_type) + " " + str(nullable)
       create_query += ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE utf8mb4_unicode_ci;"
       return create_query

def create_db_query(name):
       db_query = f"CREATE DATABASE IF NOT EXISTS {name}"
       return db_query

def get_schema_and_table_name(database_name):
       connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
    
       cursor = connection.cursor()
       query = f'SELECT table_schema,table_name FROM information_schema.tables WHERE table_schema != "pg_catalog" and table_schema != "information_schema" ORDER BY table_schema'

       cursor.execute(query)
       schemas = cursor.fetchall()
       return schemas

This code will enable us to create our database and table dynamically. This is one of the important functions because they are usually the first steps in data migration. The "get_schema_and_table_name" function will enable us to get all schema names with their table name from a database name parameter.

Scroll to Continue

Data Migration Orchestrator Codes

def start_workflow(database_name):
       schema = get_schema_and_table_name(database_name)
       create_schemas_and_load_data(database_name , schema)

def create_schemas_and_load_data(database_name , schemas):
       connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
       postgres_engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{host_name}/{database_name}')
       cursor = connection.cursor()
       for s in schemas:
                     query = f"SELECT column_name , is_nullable , data_type  FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA= '{str(s[0])}' and TABLE_NAME = '{str(s[1])}'"
                     cursor.execute(query)
                     columns = cursor.fetchall()
                     mysql_connection = f'mysql://admin:admin@127.0.0.1:3306/{database_name}'
                     engine = create_engine(mysql_connection)
                     conn = engine.connect()
                     db_query  = create_db_query(s[0])
                     db = conn.execute(db_query)
                     singe_db_connection = f'mysql://admin:admin@127.0.0.1:3306/{s[0]}'
                     single_db_engine = create_engine(singe_db_connection)
                     single_db_conn = engine.connect()
                     create_query = create_table_query(s,columns)
                     conn.execute(create_query)
                     the_data = pd.read_sql(f'SELECT * FROM {s[0]}.{s[1]}',postgres_engine)
                     x = the_data.to_sql(s[1], con=single_db_engine, if_exists='replace', index=False)
                     get_data_query = f'select * from {s[0]}.{s[1]};'

Those are probably the most important code in our project as they are the ones orchestrating and calling every function. The "create schemas_and_load_data" function handles the process after fetching the schema and table names from PostgreSQL. Its process summary would be as follows:

  1. Get all column names, nullability, and data types of all tables
  2. Create MySQL database creation query
  3. Create MySQL table creation query (using the functions above)
  4. Read data from PostgreSQL database using pandas
  5. Write data to MySQL database using pandas

Airflow Dags to Call the Above Functions

with DAG(
    dag_id='migrate_data',
    default_args=default_args,
    description='migrate data from postgres to mysql',
    start_date=datetime(2022,7,6,2),
    schedule_interval='@once'
)as dag:
    task1 = PythonOperator(
       task_id='create_schema_and_migrate_data',
       python_callable=start_workflow,
       op_kwargs={'database_name': 'Warehouse' },
    )
    task2 = PythonOperator(
       task_id='create_dataset_table',
       python_callable=migrate_privilages,
       op_kwargs={'database_name': 'Warehouse' },
    )
    task3 = PythonOperator(
       task_id='create_schema_and_migrate_data_2',
       python_callable=start_workflow,
       op_kwargs={'database_name': 'trial' },
    )
    task4 = PythonOperator(
       task_id='create_dataset_table_2',
       python_callable=migrate_privilages,
       op_kwargs={'database_name': 'trial' },
    )

    task1 >> task2 >> task3 >> task4

Privilege Migration Codes

def migrate_privilages(database_name):
       connection = db_connect.connect(host=host_name,user=db_user,password=db_password,database=database_name)
       cursor = connection.cursor()
       query = f"SELECT * FROM information_schema.table_privileges"
       cursor.execute(query)
       columns = cursor.fetchall()
       for c in columns:
              if c[1] == 'try_user':
                     check_user_existance = f'SELECT user,host FROM mysql.user where user = "{c[1]}"'
                     query = f'GRANT {c[5]} on {c[3]}.{c[4]} to {c[1]}@`localhost`;' 
                     mysql_connection = f'mysql://{db_user}:{db_password}@{host_name}:3306/{c[3]}'
                     engine = create_engine(mysql_connection)
                     conn = engine.connect()
                     excuted = conn.execute(check_user_existance)
                     if len(excuted.fetchall()) == 0:
                            create_user_query = f'CREATE USER {c[1]}@`{host_name}` IDENTIFIED WITH mysql_native_password BY "password";'
                            excuted = conn.execute(create_user_query)
                     if c[5] != "TRUNCATE":
                            excuted = conn.execute(query)

At this stage, we have migrated all the data from PostgreSQL but user privileges are left to be migrated, as migrating only the data is not enough. So on this function, the steps, in summary, would be:

  1. Get all users from PostgreSQL
  2. Check if the user exists in the MySQL database, if not create the user
  3. Bring all user-related privileges

DBT Codes for MYSQL

The DBT codes here are not much different from the PostgreSQL DBT codes. Here is a little DBT code for MySQL:

schema.yml

version: 2

models:

  - name: import_data
    description: "traffic data"
    columns:
      - name: id
        description: "unique id"
      - name: track_id
        description: "track id"
      - name: type
        description: "type"
      - name: traveled_d
        description: "traveled_d"
      - name: avg_speed
        description: "avg_speed"
      - name: lat
        description: "lat"
      - name: lon
        description: "lon"
      - name: speed
        description: "speed"
      - name: lon_acc
        description: "lon_acc"
      - name: lat_acc
        description: "lat_acc"
      - name: time
        description: "time"

traffic_dbt_model.sql

{{ config(materialized='view') }}

with traffic_dbt_model as (

    select * from public.import_data

)

select *
from traffic_dbt_model

traffic_avg_speed_by_type_model.sql

select " type" , AVG(" avg_speed") 
from {{ ref('traffic_dbt_model') }}
Group by " type"
 

Those DBT codes would create multiple views transforming the data. After this DBT would enable us to document our models and serve them locally.

Building a Dashboard Using Apache Superset

Apache Superset really a powerful tool that enables building dashboards with visualization user interface and also from SQL queries. Here are some of the charts I was able to create.

Project Challenges and Takeaways

This project has been helpful in understanding the data migration process and which challenges we might face in real-world data migration projects. Plus, we were able to understand the structure of PostgreSQL and MySQL databases and privilege models. Some pointers include:

  • While working on this project I have observed and researched to find a module or application that enables to migrate data from PostgreSQL to MySQL. But all I could find was MySQL to Postgres, which means more people and companies are moving toward Postgres not MySQL. One reason could be scalability.
  • Inside PostgreSQL, one can store multiple schemas and each has the ability to store multiple tables while in MySQL a schema is the database, which means a database can not have multiple schemas. This makes it difficult if there are two databases with the same schema or two schemas with the same table names.
  • User privilege structure in PostgreSQL has more data than MySQL, which includes the information about who granted which access to a user.
  • I found Apache Superset to be easy to use and have an easy installation process compared with Redash.
  • PostgreSQL has more data type support than MySQL, but MySQL has sub datatype that is not available in PostgreSQL, which makes it difficult to convert data types between the databases.

Future Plans and Conclusion

This project has enabled me to look deep into database architectures and models in databases like PostgreSQL and MySQL. It also allowed me to compare dashboard building tools, such as Redash and Superset. Key future works include:

  1. Analyzing and enabling migration of big (in size) databases using this project's codes.
  2. Adding more testing features for data migration.
  3. Creating more intuitive dashboard and insights.

This content is accurate and true to the best of the author’s knowledge and is not meant to substitute for formal and individualized advice from a qualified professional.

Related Articles