nimbly-dev / nyctripdata_project

Project to learn Data Engineering from: https://github.com/DataTalksClub/data-engineering-zoomcamp
0 stars 0 forks source link

Data Engineering Project: NYC Tripdata Data Infrastructure

Spark Python PostgreSQL Docker Mage

Table of Contents

πŸš€ About

This project simulates a production-grade Data Infrastructure designed to process NYC trip data through multiple stages: dev, stage, and production. The pipeline handles millions of trip data records, ensuring reliability and scalability through techniques like batch writing, disk spill management and Parallelization. The project heavily used Apache Spark's Distributed Computing capabilities to process large datasets effectively.

πŸ—‚οΈ Project Infrastructure

Environment Diagram

The data processing in this project follows a vertical pipeline architecture with three stages:

  1. Development:

    • Focuses on data cleaning, transformations, and preparation of raw data.
    • Handles specific Tripdata type modifications and processes, ensuring that each dataset (e.g., yellow cab, green cab) is adjusted individually before moving forward.
  2. Staging:

    • Responsible for data governance and ensuring data quality.
    • Applies global transformations across all Tripdata types, such as adding unique identifiers (e.g., dwid), making data consistent and up to standard before it moves to production.
  3. Production:

    • The final stage where data is fully prepared for feature extraction, reporting, and analysis.
    • The data at this point is ready for consumption in analytical and reporting workflows.

The project leverages both Data Lakehouse and Data Warehouse architectures for effective data management:

This architecture demonstrates the system’s ability to handle large datasets with high reliability and efficiency, mimicking a production-like environment.

πŸ“Š Dataset

The data is sourced from the NYC Taxi & Limousine Commission Trip Record Data. Many data engineering principles used in this project are inspired by the DataTalksClub Data Engineering Zoomcamp.

πŸ“ Setup

Prerequisites

Before running the docker-compose up -d command, please review the Running on Docker section to modify the image resources if running on a low-end environment. Take note of the System Requirements below.

You must have the latest version of Docker and docker-compose installed. Furthermore, you also must have Postman to trigger the pipeline.

System Requirements

Specification Suggested Requirements Minimum Requirements (for alternative docker-compose) TODO
CPU 6 cores, 3.5 GHz or higher 3 cores
RAM 32 GB (16 GB allocated to Docker) 16 GB (8 GB allocated to Docker)
Spark Cluster 3 workers, each with 2 cores and 4 GB of RAM 2 workers, each with 2 cores and 2.5 GB of RAM
Storage 30-50 GB 30-50 GB

Running Project using Docker Compose

# Open a terminal (Command Prompt or PowerShell for Windows, Terminal for macOS or Linux)

# Ensure Git is installed
# Visit https://git-scm.com to download and install console Git if not already installed

# Clone the repository
git clone https://github.com/Abblix/Oidc.Server.git](https://github.com/nimbly-dev/nyctripdata_project.git

# Navigate to the project directory
cd nyctripdata_project

# Execute this command, wait until installation is complete and after finishing it will start the server.
docker-compose up -d

After starting the project, navigate to http://localhost:6789/. This will take you to the Mage Dashboard, where you can manage the pipelines and view data processing workflows.

NYC Tripdata Overview Page

πŸ“š Documentation

Note: This guide focuses on the usage of the project and its pipelines. For detailed information on using Mage, please refer to the Mage documentation.

Running on Docker

This project is containerized using Docker to simplify deployment across multiple environments. Docker allows for easy distribution and configuration management by simply editing the service declarations in the docker-compose.yml file.

The Docker Compose configuration sets up the following services, all connected through a shared network named mage-network:

β”œβ”€β”€ mage_orchestrator     # Manages workflows and pipelines (Mage AI)
β”œβ”€β”€ spark_master          # Master node managing the Spark cluster
β”‚   β”œβ”€β”€ spark-worker-1    # Spark worker node 1
β”‚   β”œβ”€β”€ spark-worker-2    # Spark worker node 2
β”‚   └── spark-worker-3    # Spark worker node 3
β”œβ”€β”€ pg_admin              # PgAdmin web UI for PostgreSQL management
β”œβ”€β”€ postgres-dev          # PostgreSQL for the development environment
β”œβ”€β”€ postgres-staging      # PostgreSQL for the staging environment
└── postgres-production   # PostgreSQL for the production environment

Spark Configuration

The default Spark service configuration includes the following ports:

Adding a New Spark Worker

If you need to add additional Spark workers to the cluster, you can easily append a new worker configuration to the docker-compose.yml. Below is an example configuration to add a new Spark worker:

  spark-new-worker:
    image: cluster-apache-spark:python3.10.14-spark3.5.1
    container_name: spark-new-worker
    entrypoint: ['/bin/bash', '/start-spark.sh', 'worker']
    networks:
      - mage-network
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER=spark://spark-master:7077
      - SPARK_WORKER_CORES=2  
      - SPARK_WORKER_MEMORY=4G 
      - SPARK_WORKLOAD=worker
      - SPARK_LOCAL_IP=spark-new-worker
      - PYSPARK_PYTHON=${PYSPARK_PYTHON}
      - PYSPARK_DRIVER_PYTHON=${PYSPARK_DRIVER_PYTHON}
      - SPARK_EVENTLOG_DIR=${SPARK_EVENTLOG_DIR}
      - SPARK_HISTORY_DIR=${SPARK_HISTORY_DIR}
      - SPARK_WAREHOUSE_DIR=${SPARK_WAREHOUSE_DIR}
      - SPARK_LAKEHOUSE_DIR=${SPARK_LAKEHOUSE_DIR}
      - SPARK_CHECKPOINT_DIR=${SPARK_CHECKPOINT_DIR}
      - SPARK_LOCAL_DIR=${SPARK_LOCAL_DIR}
    volumes:
      - spark-home:/opt/spark
      - ./spark-data:/opt/spark/work
    ports:
      - "{CHANGE_PORT_HERE}:8081"  # Change this port to avoid conflicts
    deploy:
      resources:
        limits:
          cpus: "2"
          memory: "4G"

Database Configurations

The following are the default hostname and ports of our Postgres databases.

Database Name Hostname Port
nyc_taxi_dev_postgres postgres-dev 5432
nyc_taxi_staging_postgres postgres-staging 5433
nyc_taxi_production_postgres postgres-production 5434

Furthermore, these are the service account users that are used by the pipeline in order to query with the databases

Environment Service Account Email Database Name Role (Privileges)
Development dev-service-account@de-nyctripdata-project.iam.com nyc_taxi_dev_postgres - SELECT, INSERT, UPDATE, DELETE on all tables in public and temp schemas
- CREATE on public and temp schemas
- USAGE, SELECT on all sequences in public and temp schemas
Staging staging-service-account@de-nyctripdata-project.iam.com nyc_taxi_staging_postgres - SELECT, INSERT, UPDATE, DELETE on all tables in public and temp schemas
- CREATE on public and temp schemas
- USAGE, SELECT on all sequences in public and temp schemas
Production production-service-account@de-nyctripdata-project.iam.com nyc_taxi_production_postgres - SELECT, INSERT, UPDATE, DELETE on all tables in public and temp schemas
- CREATE on public and temp schemas
- USAGE, SELECT on all sequences in public and temp schemas

Populating the Databases with Tripdata

To populate the databases with NYC trip data, you can use the spark_populate_tripdata_local_infrastructure pipeline. This pipeline orchestrates various stages, transforming raw trip data into production-ready formats to be stored in both the Data Warehouse and Data Lakehouse.

Getting Started with the Pipeline

  1. Navigate to the Pipelines:
    From the Mage Dashboard, click on Pipelines in the left-side panel. Then, select the spark_populate_tripdata_local_infrastructure pipeline to proceed.

    NYC Tripdata Pipeline List

  2. Trigger the Pipeline:
    On the pipeline page, navigate to Trigger in the left-side panel. Click on the Run Pipeline orchestration via API hyperlink to open the pipeline's trigger endpoint.

    Pipeline Trigger

  3. Execute the Pipeline via API:
    Copy the API URL provided and use a tool like Postman to execute the pipeline.

    Trigger Endpoint URL

Example API Request Body

Here’s an example request body that you can use in Postman to run the pipeline:

{
  "pipeline_run": {
    "variables": {
      "dev_limit_rows" : -1,
      "end_month": 12,
      "end_year": 2021,
      "start_month": 1,
      "start_year": 2021,
      "pipeline_run_name": "populate_fhvtripdata_2022",
      "spark_mode" : "cluster",
      "tripdata_type": "fhv_cab_tripdata",
      "data_loss_threshold": "very_strict",
      "is_overwrite_enabled": true
    }
  }
}

Once you send the request, the pipeline will begin processing the data as per the parameters you provided.

API Request Body Parameters

Parameter Description Example Value
dev_limit_rows Limit the number of rows for testing. Set to -1 to process all rows. -1 (process all rows)
start_year The start year of the trip data to be processed. 2021
start_month The start month of the trip data to be processed. 1 (for January)
end_year The end year of the trip data to be processed. 2021
end_month The end month of the trip data to be processed. 12 (for December)
pipeline_run_name A custom name for the pipeline run, useful for tracking multiple runs. "populate_fhvtripdata_2022"
spark_mode The execution mode for Spark. Set to "local" for local execution or "cluster" for distributed execution. "cluster"
tripdata_type Specifies the type of trip data to be processed. Possible values: "yellow_cab_tripdata", "green_cab_tripdata", "fhv_cab_tripdata". "fhv_cab_tripdata"
data_loss_threshold Specifies the acceptable level of data loss during processing. Possible values: "very_strict" (1% loss), "strict" (5% loss), "moderate" (10% loss). "very_strict"
is_overwrite_enabled Specifies if either to overwrite or update existing data on the PSQL data-warehouse. "true"

Explanation of Key Parameters:

Structure of the Populate Tripdata Pipeline

Trigger Endpoint URL

This diagram is the overview of the Pipeline Orchestration. This follows the Data Proccessing stages that we mentioned on the Project description.

The following is the Pipelines being run by this pipeline Orchestration

β”œβ”€β”€ conditional_run_spark_taxi_etl_to_dev_partition # Run Pipeline based on tripdata
β”œβ”€β”€β”€spark_yellow_taxi_etl_to_dev_partition
β”œβ”€β”€β”€spark_green_taxi_etl_to_dev_partition
β”œβ”€β”€β”€spark_fhv_taxi_etl_to_dev_partition
β”œβ”€β”€ check_if_spark_taxi_etil_to_dev_partition_is_done # Run Pipeline based on tripdata
β”œβ”€β”€β”€spark_load_to_psql_stage
β”œβ”€β”€β”€spark_psql_stage_to_local_lakehouse_dir
β”œβ”€β”€β”€spark_psql_stage_to_production
β”œβ”€β”€β”€data_tmp_cleanup

Populate Infra Tripdata Pipelines

Populate Infra Tripdata Pipelines

The Populate Infra Tripdata Pipelines workflow is a series of pipelines that automate the processing of NYC trip data from downloading to production. Each pipeline performs specific tasks to ensure data is correctly processed, cleaned, and loaded into PostgreSQL and the Lakehouse storage.

1. spark_taxi_etil_to_dev_partition Pipeline

This pipeline is responsible for preparing the trip data for further processing:

2. spark_load_to_psql_stage Pipeline

This pipeline loads the cleaned trip data into the staging PostgreSQL table using an upsert strategy:

Overwrite Workflow: Spark Load Workflow

3. spark_psql_stage_to_local_lakehouse_dir Pipeline

This pipeline moves the trip data from the staging PostgreSQL table to the local Lakehouse directory:

4. spark_psql_stage_to_production Pipeline

This pipeline transfers data from the Lakehouse and staging environments to the production PostgreSQL table:

Tripdata Tables

When using Docker Compose, partitioned Tripdata tables will be created without data. A series of SQL queries will be executed, and you can review these queries in the deployment folder. The project includes three types of datasets:

Dataset Description Data Dictionary
Yellow Tripdata Data containing trip records for Yellow Cab taxis. Yellow Tripdata Dictionary
Green Tripdata Data containing trip records for Green Cab taxis. Green Tripdata Dictionary
FHV Tripdata Data containing trip records for For-Hire Vehicles. FHV Tripdata Dictionary

dwid Column

The dwid column is added at the stage level and is used as a primary key in various Trip datasets. It is generated by hashing a concatenation of common columns (pickup_datetime, dropoff_datetime, pu_location_id, and do_location_id) using the SHA-256 algorithm.

Key Points:

By generating a dwid based on these key columns, we ensure consistent and unique identification across datasets, this also allows us to do an upsert operation whenever needed using this column.

Partitioning in the Tripdata Tables

When data is appended to Tripdata tables, the pipeline creates a partition based on the month extracted from pickup_datetime. Each partition corresponds to a specific month. Data is inserted into the appropriate partition after creation.

A SQL function available in all public schemas automates this process. It takes the table name and target date in datetime format, creating a partition named: {tripdata_tablename}{environment}{year}{month}. The default partition key is pickup_datetime.

Modifying the Pipeline

When modifying the pipeline, consider the following guidelines:

Monitoring Pipelines

You can monitor the progress of your pipeline directly in the Mage app. Navigate to http://localhost:6789/pipelines to view the pipeline status.

If the pipeline fails, you can investigate the issue by selecting the pipeline detail and reviewing the logs. You can also re-run the pipeline if necessary.

Pipeline Run List

Clicking on a specific pipeline will display the individual code blocks that are executed as part of the pipeline:

Pipeline Run Detail

To view real-time logs or troubleshoot errors, click the logs icon, which provides a live tail of the pipeline’s execution:

Pipeline Run Log Tail