GSA / data.gov

Main repository for the data.gov service
https://data.gov
Other
628 stars 99 forks source link

SPIKE: Create harvesting workflow using apache airflow. #4422

Closed rshewitt closed 1 year ago

rshewitt commented 1 year ago

User Story

Acceptance Criteria

Background

Sketch

Do the following in order, as time allows given this is a spike:

btylerburton commented 1 year ago

Some assumptions to test...

btylerburton commented 1 year ago

I'm going to put my name on this as I'll be devoting some time to it, but that shouldn't preclude others from working on it as well, either collaboratively or in private.

rshewitt commented 1 year ago

The airflow scheduler ( airflow version 2.3.0 ) won't detect a DAG using the TaskFlow API approach ( e.g. decorating with task instead of using PythonOperator see comment below as an example ) unless you assign the DAG function invocation to something. For example, this works...

_ = dag_function()

This doesn't work...

dag_function()

When using the latter approach the scheduler log indicates ...WARNING - No viable dags retrieved from [location of dag script.py]). This seems to conflict with an [example](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#:~:text=(fahrenheit)-,etl(),-Custom%20Objects) found in the TaskFlow documentation

rshewitt commented 1 year ago

using the TaskFlow API approach means less code to write. For example,

#dag creation has already occurred 
@task(task_id="transform")
def transform():
    # do something
transform() #dag workflow

compares to

#dag creation has already occurred
def transform():
  #do something

task1 = PythonOperator(
    task_id='transform',
    python_callable=tranform,
    dag=dag, # attach task to dag
)
task1() #dag workflow

Long story short it removes the explicit need for the PythonOperator ( in this instance ). It may apply to any conventional airflow operator ( e.g. email, python, bash ).

rshewitt commented 1 year ago

I published my branch airflow-etl-test-reid. it's intended as a WIP. I paired with @jbrown-xentity earlier and went over my current findings. we discussed the possibility of tasks needing to share data between themselves ( e.g. extract to transform ) and based on what i've read tasks are meant to execute in isolation but airflow offers a cross-task communication mechanism called XComs which allows for this. the amount/size of information passed in order to see a reaction in airflow could be a valuable test.

rshewitt commented 1 year ago

if sharing data between tasks proves unacceptable one alternative could be having each task pull data from s3, process it, then load it back to s3. i'm unsure if this is better.

rshewitt commented 1 year ago

looks like tasks push to xcom by default if they return a value that is not None

rshewitt commented 1 year ago

some run information when extracting and validating a large spatial dataset. Notably the Max Run Duration. This could be a miscalculation based on limited information from the task. it's probably something to do with the first run start and last run start.

Screenshot 2023-08-21 at 12 29 04 PM
jbrown-xentity commented 1 year ago

To summarize: we have a working version of a DAG (a harvest) that does the extract of a full DCAT-US catalog and distributes the individual datasets into a new workflow where they get validated independently. There are 4 things we would like to investigate further:

rshewitt commented 1 year ago

Test case where 2 of the dcatus records in the catalog are invalid.

Screenshot 2023-08-22 at 12 43 19 PM

Proceeded by only 1 load ( the valid record ).

Screenshot 2023-08-22 at 12 43 29 PM

rshewitt commented 1 year ago

Test case where the extraction failed resulting in all downstream tasks ( validate & load ) to fail.

Screenshot 2023-08-22 at 12 48 40 PM

rshewitt commented 1 year ago

The picture above indicates the validate and load were kicked off which we may want to avoid entirely but looking at the task duration below of the validate shows that despite the job kicking off no time was spent on it.

Screenshot 2023-08-22 at 12 58 42 PM

rshewitt commented 1 year ago

i'm going to use a subset of the current catalog harvest sources using this query as the input for dynamically generating DAGs.

jbrown-xentity commented 1 year ago

For cloud.gov installation, would utilize pip installation on python runner, probably with redis extras: https://airflow.apache.org/docs/apache-airflow/1.10.10/installation.html#extra-packages

robert-bryson commented 1 year ago

Breaking off the deploy onto cloudgov work into https://github.com/GSA/data.gov/issues/4434

rshewitt commented 1 year ago

in response to my previous comment i've since found that DAG’s are identified by whether they exist in the global namespace of the file ( i.e. if they are in globals()) when the file is processed. function invocation or class instantiation without storing the value in a variable does not add to the global namespace of the file. since globals() returns a dictionary not using a variable is not assigning a key to the value.

rshewitt commented 1 year ago

the size limit for an xcom using a postgres backend looks to be 1 GB. I've seen this value referenced in other articles as well.

rshewitt commented 1 year ago

docker crashed after attempting to process 16 instances of a dataset on my local machine. it appeared to be a memory issue.

btylerburton commented 1 year ago

let's do some real load testing on cloud.gov after @robert-bryson's work is stood up.

rshewitt commented 1 year ago

Something to note on callback execution of tasks. This is in reference to my work on error handling.

rshewitt commented 1 year ago

callbacks functions are provided with the associated task context. here's a list of the variables within a context

rshewitt commented 1 year ago

What kind of techniques do we have to control the flow of information when something wrong occurs?

rshewitt commented 1 year ago

using the TaskFlow API approach requires calling the tasks in the workflow in order for them to work properly.

# TaskFlow API approach
@dag(*args,**kwargs)
def test_pipeline():

  @task(task_id="task1")
  def task1():
    return "example"

  @task(task_id="task2")
  def task2():
    return "another example"

  task1() >> task2() #calling the functions

_ = test_pipeline()

#Traditional approach
with DAG(**kwargs) as dag: #using a context manager because decorating would be using TaskFlow. i'm sure you can use a context manager with TaskFlow and it would work fine.

  task1 = EmptyOperator(task_id="task1")
  task2 = EmptyOperator(task_id="task2")

  task1 >> task2 #not calling the operators

_ = dag() #i figure this is still applicable??

issue comment for reference on TaskFlow approach.

Basically, if you're using operators then you don't need to invoke them. if you're using functions decorated with task then you need to invoke them.

rshewitt commented 1 year ago

Rule of thumb, anytime branching is implemented ALWAYS consider the trigger_rule of downstream processes. By default tasks have a trigger_rule of "all_success" meaning all immediate parents tasks must succeed. In the event of a branch, the successful branch is followed and the failure branch is not causing all tasks in the failure branch to be skipped. skipped is a state of a task. skipped != success. a common pattern is to have branches join together eventually ( e.g. a bulk load ). this load would be skipped if a branch converging into it was skipped as a result of the branching process.

rshewitt commented 1 year ago

We could potentially store our validation schema's as variables in airflow.

rshewitt commented 1 year ago
                     -> task C (true)    \
                    /
 task A -> taskBranch                            -> task F
                    \  ->  task E(Dummy)(false) /

^ this works.

apparently branching requires at least 2 tasks. one for true and one for false. it seems like taskBranch can't create a direct dependency to task F if a false task isn't provided so a dummy operator needs to be used.

                     -> task C (true)  \
                   /
 task A -> taskBranch   -----            -> task F

^ this doesn't work.

rshewitt commented 1 year ago

Error extracting commerce_non_spatial_data.json_harvest_source_workflow in airflow. The size of the downloaded json is too large for xcom to push (i.e. airflow.exceptions.UnmappableXComLengthPushed: unmappable return value length: 1609 > 1024). source

rshewitt commented 1 year ago

Error extracting bls_data_workflow. The airflow log indicates a json decode error. Attempting to download the json file manually indicates a 403 http status code.