CDCgov / phdi

https://cdcgov.github.io/dibbs-site/
Creative Commons Zero v1.0 Universal
32 stars 14 forks source link

SPIKE: Airflow #793

Closed emmastephenson closed 1 year ago

emmastephenson commented 1 year ago

Spike on the potential of using Apache Airflow to manage jobs for the orchestration service.

emmastephenson commented 1 year ago

Reporting back that Airflow does not seem like an ideal solution for the Orchestration API. It's primarily designed for orchestrating many different pipelines across an organization, and our use case is more like running a single customizable pipeline for many different organizations. Airflow expects workflow A to lead to workflow B which will kick off C, and in our case A, B, and C are entirely isolated.

A better solution would be something like Apache Spark (or perhaps NiFi?). A solution that could take a large volume of data from a data source and slowly batch it to the message-by-message orchestration service.

More to come tomorrow. See this Google Doc for more detailed Airflow notes.

emmastephenson commented 1 year ago

This has more or less turned into a spike on batching instead.

Some pseudocode I created as an option here:

@endpoint (POST)
def accept_input(zip_file, user_config):
    validate that zip is acceptable (xml)

    async call start_job(zip, config)

    db.write(job id, "job started")

    return job id to user

ASYNC
def start_job(zip_file, user_config):
    store_messages(zip)
    process_messages(config)

def store_messages(zip_file):
    zip_contents = unzip(zip_file)

    # need to store the values in container storage vs a queue because
    # AZ queues can only process up to 64 KB per message
    azure_storage_container.create(temp storage)
    azure_storage_container.write(zip_contents)

    db.write("contents in temporary storage")

def process_messages(config):
    # List all the eCRs in the job
    azure_storage_container.list_blob()

    # Be smart here - we'll want to trigger about 20 calls simultaneously, not 
    # in sequence
    # THIS is the place where we might want to use something smart like Spark or petl
    # Otherwise basically the same as the existing function app
    for blobs in batch of 20:
        orchestrate(config, blobs)
        db.write(blob processed: blob)

# This is the service we've currently been working on
# Taking in a user configuration, process the data requested one at a time
def orchestrate(config, data):
    read user config
    process single message through config
    return HTML? 

Open questions from this thought exercise: