josephmachado / beginner_de_project_stream

Simple stream processing pipeline
https://www.startdataengineering.com/post/data-engineering-project-for-beginners-stream-edition/
91 stars 27 forks source link
apache-flink dataengineering datapipeline graphana postgresql prometheus

Beginner Data Engineering Project - Stream Version

Code for blog at Data Engineering Project Stream Edition.

Project

Consider we run an e-commerce website. An everyday use case with e-commerce is to identify, for every product purchased, the click that led to this purchase. Attribution is the joining of checkout(purchase) of a product to a click. There are multiple types of attribution; we will focus on First Click Attribution.

Our objectives are:

  1. Enrich checkout data with the user name. The user data is in a transactional database.
  2. Identify which click leads to a checkout (aka attribution). For every product checkout, we consider the earliest click a user made on that product in the previous hour to be the click that led to a checkout.
  3. Log the checkouts and their corresponding attributed clicks (if any) into a table.

Run on codespaces

You can run this data pipeline using GitHub codespaces. Follow the instructions below.

  1. Create codespaces by going to the beginner_de_project_stream repository, cloning(or fork) it and then clicking on Create codespaces on main button.
  2. Wait for codespaces to start, then in the terminal type make run.
  3. Wait for make run to complete.
  4. Go to the ports tab and click on the link exposing port 8081 to access Flink UI and clicking on Jobs -> Running Jobs -> checkout-attribution-job to see our running job..

codespace start codespace make up codespace access ui

Note Make sure to switch off codespaces instance, you only have limited free usage; see docs here.

Run locally

Prerequisites

To run the code, you'll need the following:

  1. git
  2. Docker with at least 4GB of RAM and Docker Compose v1.27.0 or later
  3. psql

If you are using windows please setup WSL and a local Ubuntu Virtual machine following the instructions here. Install the above prerequisites on your ubuntu terminal, if you have trouble installing docker follow the steps here.

Architecture

Our streaming pipeline architecture is as follows (from left to right):

  1. Application: Website generates clicks and checkout event data.
  2. Queue: The clicks and checkout data are sent to their corresponding Kafka topics.
  3. Stream processing:
    1. Flink reads data from the Kafka topics.
    2. The click data is stored in our cluster state. Note that we only store click information for the last hour, and we only store one click per user-product combination.
    3. The checkout data is enriched with user information by querying the user table in Postgres.
    4. The checkout data is left joined with the click data( in the cluster state) to see if the checkout can be attributed to a click.
  4. The enriched and attributed checkout data is logged into a Postgres sink table.
  5. Monitoring & Alerting: Apache Flink metrics are pulled by Prometheus and visualized using Graphana.

Architecture

Code design

We use Apache Table API to

  1. Define Source systems: clicks, checkouts and users. This python script generates fake click and checkout data.
  2. Define how to process the data (enrich and attribute): Enriching with user data and attributing checkouts
  3. Define Sink system: sink

The function run_checkout_attribution_job creates the sources, and sink and runs the data processing.

We store the SQL DDL and DML in the folders source, process, and sink corresponding to the above steps. We use Jinja2 to replace placeholders with config values. The code is available here.

Run streaming job

Clone and run the streaming job (via terminal) as shown below:

git clone https://github.com/josephmachado/beginner_de_project_stream
cd beginner_de_project_stream
make run # restart all containers, & start streaming job
  1. Apache Flink UI: Open http://localhost:8081/ or run make ui and click on Jobs -> Running Jobs -> checkout-attribution-job to see our running job.
  2. Graphana: Visualize system metrics with Graphana, use the make open command or go to http://localhost:3000 via your browser (username: admin, password:flink).

Note: Checkout Makefile to see how/what commands are run. Use make down to spin down the containers.

Check output

Once we start the job, it will run asynchronously. We can check the Flink UI (http://localhost:8081/ or make ui) and clicking on Jobs -> Running Jobs -> checkout-attribution-job to see our running job.

Flink UI

We can check the output of our job, by looking at the attributed checkouts.

Open a postgres terminal as shown below.

pgcli -h localhost -p 5432 -U postgres -d postgres 
# password: postgres

Use the below query to check that the output updates every few seconds.

SELECT checkout_id, click_id, checkout_time, click_time, user_name FROM commerce.attributed_checkouts order by checkout_time desc limit 5;

Tear down

Use make down to spin down the containers.

Contributing

Contributions are welcome. If you would like to contribute you can help by opening a Github issue or putting up a PR.

References

  1. Apache Flink docs
  2. Flink Prometheus example project