The goal is to create a POC of the workflow with multiple dags, testing the data flow in airflow. The objectives are to test the following functionalities that will be crucial in implementing inspire ingestion pipeline:
DAG dependency relationships:
moving data between dags,
triggering next dags by other dags and by the airflow API,
restarting workflow starting at specific DAG,
DB reads and saves from DAGs to manage data,
getting workflow state (current DAG/task) with an API call.
For the POC we’ll implement 3 DAGs - one with a minimal record preprocessing, second one with a data approval process that will require branching and airflow API call and the third one with a minimal post processing.
State monitoring
We should be able to get the current dag/task with API. That is a bit complicated, because when having multiple dags in airflow, running them creates a different run id for each one, the runs have to be grouped and stored. Thus, we’ll need to create a custom identifier in the beginning of the DAG and save each DAG run ID to the db. Then, having a last run ID you can get the state via API.
Triggering next DAG
For the POC, we’ll try triggering the next DAG with an API call. We can also try ExternalTaskSensor operator that waits for the upstream DAG to be completed.
Data passing between the tasks
After each DAG we’ll save the data to the database and in the beginning of the DAG we read the data from DB.
DB Setup
We’ll store the article data and workflow metadata in external DB. For the POC, beyond the tables provided by airflow, the DB should have the table to store the article data, one that stores workflow metadata (eg. info about coreness, approval) and a table that will contain a custom workflow identifier (that we have to assign in order to group dag ids together), dag run ids and dag run starting time? (to fetch the current dag)
Create a workflow POC
Objectives
The goal is to create a POC of the workflow with multiple dags, testing the data flow in airflow. The objectives are to test the following functionalities that will be crucial in implementing inspire ingestion pipeline:
For the POC we’ll implement 3 DAGs - one with a minimal record preprocessing, second one with a data approval process that will require branching and airflow API call and the third one with a minimal post processing.
State monitoring
We should be able to get the current dag/task with API. That is a bit complicated, because when having multiple dags in airflow, running them creates a different run id for each one, the runs have to be grouped and stored. Thus, we’ll need to create a custom identifier in the beginning of the DAG and save each DAG run ID to the db. Then, having a last run ID you can get the state via API.
Triggering next DAG
For the POC, we’ll try triggering the next DAG with an API call. We can also try ExternalTaskSensor operator that waits for the upstream DAG to be completed.
Data passing between the tasks
After each DAG we’ll save the data to the database and in the beginning of the DAG we read the data from DB.
DB Setup
We’ll store the article data and workflow metadata in external DB. For the POC, beyond the tables provided by airflow, the DB should have the table to store the article data, one that stores workflow metadata (eg. info about coreness, approval) and a table that will contain a custom workflow identifier (that we have to assign in order to group dag ids together), dag run ids and dag run starting time? (to fetch the current dag)