buda-base / ao-workflows

Use DAG platform to define and orchestrate workflows
0 stars 0 forks source link

Handle multiple regions #14

Closed jimk-bdrc closed 6 months ago

jimk-bdrc commented 6 months ago

There's one DAG to initiate the workflow, but it looks in an SQS queue in the us-east-1 region, which is the default. The actual FPL and NLM archives are in different regions:

source zone
NLM ap-northeast-1
FPL ap-southeast-1
jimk-bdrc commented 6 months ago

Strategy discussion

There are several approaches:

  1. Extend the existing architecture to retreive SQS from several regions.
  2. Just restore the existing files, and move them to a bucket in the us-east-1 region, where the existing queue can handle them.

Extending architecture:

Alt 1: Create a task for each region, sum

@task(id=get-region-1)
def _get_region_1(**context):
      ....

@task(id=get-region-2)
def _get_region_2(**context):
      ....

@task(id=get-region-3)
def _get_region_3(**context):
      ....

with. DAG(...) as dag:
     [ get-region-1, get-region-2, get-region3] >> process_something()

It would be nice to blend this style with the taskflow api, which looks like standard python:

    msgs = get_restored_object_messages()
    downloads = download_from_messages(msgs)
    to_sync = debag_downloads(downloads)
    syncd = sync_debagged(to_sync)

giving us:

     [ get-region-1, get-region-2, get-region3] >>     {msgs = get_restored_object_messages()
    downloads = download_from_messages(msgs)
    to_sync = debag_downloads(downloads)
    syncd = sync_debagged(to_sync)
}

We could also do this by using the SQSSensor provider, which, although it doesn't return a value, (despite the claims in Amazon Simple Queue Service can trigger:

Also, you could template each task:

calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/{{ds}}.json",
        "output_path": "/data/stats/{{ds}}.csv",
    },
    # Required in Airflow 1.10 to access templates_dict, deprecated in Airflow 2+.
    # provide_context=True,
    dag=dag,
)
jimk-bdrc commented 6 months ago

Strategy resolution:

Having the existing get_restored_object_messages task poll different sqs objects, each using a boto3 context configured for a different region,seems like the simplest for now. It subsumes all the complexity into the first task. However, the returned structure will have to have some SQS metadata that allows for deletion of the message. Note the task will have to return the region that has the standing task, but that might be a property in the returned message.

jimk-bdrc commented 6 months ago

See AWS connections Want to set up different connections for different region (another go trying SQSSensor)

jimk-bdrc commented 6 months ago

The test is running - on the AWS console, I initiated expedited restores of s3://glacier.staging.nlm/bdrc.org/Archive0/00/W1NLM4400/W1NLM4400.bag.zip and ..../W1NLM4500/W1NLM4500.bag.zip

The airflow is running on https://sattva:8089 The scheduler is set to start at 19:30 EST on 2024-04-04 and run until 2024-04-06. It should pick up the restores and "sync" them locally, using the qa database. I will re-sync them manually myself.

jimk-bdrc commented 6 months ago

Results: One glacier restore message (W1NLM4500) was processed, and then it appeared that the postgres server in the docker container crashed. Possibly because the dag was running in catchup mode (I set it's start_date in the past, and when the container came up, it launched DAG instances for every hour between start_date and re-run time. Trying now with

with DAG(...... catchup=False...) as gs_dag:
jimk-bdrc commented 6 months ago

restoring three different NLM works. Saving the restore requests in s3://manifest.bdrc.org/${work}_restore_request.log

Airflow running sqs_scheduled_dag every 10 minutes, with no catchup.

jimk-bdrc commented 6 months ago

OK, three SQS messages were received from the NlmReadyForIntakeQueue but downloading them:

[2024-04-05, 20:30:09 EDT] {taskinstance.py:2513} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='sqs_scheduled_dag' AIRFLOW_CTX_TASK_ID='download_from_messages' AIRFLOW_CTX_EXECUTION_DATE='2024-04-06T00:20:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-04-06T00:20:00+00:00'
[2024-04-05, 20:30:09 EDT] {logging_mixin.py:188} INFO - using secrets
[2024-04-05, 20:30:09 EDT] {logging_mixin.py:188} INFO - section='ap_northeast'   ['default', 'ap_northeast']
[2024-04-05, 20:34:22 EDT] {taskinstance.py:2731} ERROR - Task failed with exception
Traceback (most recent call last):
...
                   ^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.11/site-packages/s3transfer/download.py", line 643, in _main
    fileobj.write(data)
  File "/home/airflow/.local/lib/python3.11/site-packages/s3transfer/utils.py", line 379, in write
    self._fileobj.write(data)
OSError: [Errno 28] No space left on device
[2024-04-05, 20:34:22 EDT] {taskinstance.py:1149} INFO - Marking task as FAILED. dag_id=sqs_scheduled_dag, task_id=download_from_messages, execution_date=20240406T002000, start_date=20240406T003009, end_date=20240406T003422
[2024-04-05, 20:34:22 EDT] {standard_task_runner.py:107} ERROR - Failed to execute job 259 for task download_from_messages ([Errno 28] No space left on device; 13305)
[2024-04-05, 20:34:22 EDT] {local_task_job_runner.py:234} INFO - Task exited with return code 1

Which means the working dir has to be configured to be a bind mount to the host.