ucldc / avram

django app for the registry
1 stars 6 forks source link

Harvest events #175

Closed amywieliczka closed 7 months ago

amywieliczka commented 8 months ago

Rikolti publishes airflow event messages to SNS in the following format:

{
    "version": 1.0,
    "dag_id": "harvest_collection",
    "dag_run_id": "manual__2024-03-06T18:18:26+00:00",
    "logical_date": "2024-03-06T18:18:26+00:00",
    "dag_run_conf": "{'collection_id': '27551'}",
    "host": "https://localhost:8080",
    "rikolti_message": "json.dumps(message_from_rikolti_system)",
    # optional:
    "task_id": "mapping.map_page",
    "try_number": 0,
    "map_index": 0,
}

https://github.com/ucldc/rikolti/blob/390a0fa49d9e02636ba5ff21af35bd2b904090c3/dags/shared_tasks/shared.py#L28-L46

SNS sends Rikolti airflow event messages along to SQS in the following SNS message format:

{
  "Type": "Notification",
  "MessageId": "27e4fdd2-70ac-5bca-b9d2-a3b18b037a01",
  "TopicArn": "arn:aws:sns:us-west-2:866216109762:AmyDevRikoltiEvents",
  "Timestamp": "2024-03-07T17:29:19.175Z",
  "SignatureVersion": "1",
  "Signature": "iF4CO3uK2dXrOau/RhVE5G42Ii681tsvi4ph/X8BhI55ZKniOD0kPax5pUQ7h0+wFycbmHPJ6Z7u6vQVWNri8S6f6iUECnwtC99lzMvtWUFOmcugFEGNEtV2JXABhzN93cB5/jQpL7ig7pVqPFaDeVrKr6toPPljIIauitW2XVpISq+5V+cpbCi/lBOi+xaWq86ojzv4iKf7ZapkdcM0LUSzbl/SLUgBjtWLIyl69MPKNldN4SZrVUB1jv+eat1RxhNsc5mltbNFVKqZW+5ApCPi88oWd2ZpUbTP3UUZEPIzvHtf/kEgwQwm16Z0BMSerlnEkHOuUcQG1oB9fDycRQ==",
  "SigningCertURL": "https://sns.us-west-2.amazonaws.com/SimpleNotificationService-60eadc530605d63b8e62a523676ef735.pem",
  "UnsubscribeURL": "https://sns.us-west-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-west-2:866216109762:AmyDevRikoltiEvents:d2680521-f115-42e2-af90-1adb5d38329c"
  "Message": <the rikolti airflow event message outlined above>
}

The rikolti_status management command polls the SQS queue for a list of new SQS messages, each of which contains an SNS message, each of which contains a Rikolti Airflow Event Message, each of which contains a Rikolti Message. For every SQS message that comes in, rikolti_status first tries to find or create a HarvestRun, then creates a HarvestEvent.

# HarvestRun
collection = <Optional Collection, try to find collection_id from dag_conf>
harvest_trigger = <Optional HarvestTrigger, try to find from dag_id, dag_run_id, logical_date>
dag_id = rikolti_airflow_event_msg['dag_id']                # required
dag_run_id = rikolti_airflow_event_msg['dag_run_id']        # required
logical_date = rikolti_airflow_event_msg['logical_date']    # required
dag_run_conf = rikolti_airflow_event_msg['dag_run_conf']    # optional
host = rikolti_airflow_event_msg['host']                    # optional, not in event msg prior to https://github.com/ucldc/rikolti/pull/789
status = <defaults to 'running'>                            # required

# Harvest Event
collection = <Optional Collection, try to find collection from HarvestRun>
harvest_run = <Required HarvestRun>
task_id = rikolti_airflow_event_msg['task_id']                  # optional
try_number = rikolti_airflow_event_msg['try_number']            # optional
map_index = rikolti_airflow_event_msg['map_index']              # optional
rikolti_message = rikolti_airflow_event_msg['rikolti_message']  # required
error = <default False, if 'error' in rikolti_message, True>    # required
sns_timestamp = sns_message['Timestamp']                        # optional
sqs_message = <Optional JSON w/ keys: MessageId, ReceiptHandle, MD5OfBody>
sns_message = <Optional JSON w/ keys: Type, MessageId, TopicArn, Timestamp, SignatureVersion, Signature, SigningCertURL, UnsubscribeURL>

Finally, rikolti_status tries to understand the status of the HarvestRun for which we've just received a new event. A HarvestRun status can either be running, succeeded, or failed. If we've just received a new event for the HarvestRun, then it is presumed to be running - an event cannot be sent if it is not running. After the creation of the new HarvestEvent, rikolti_status sorts all events for the HarvestRun by their sns_timestamp to retrieve the most recent event. If the most recent event's rikolti_message contains the special keys error or dag_complete, then the HarvestRun status is set to failed or succeeded, respectively.

The rest of this PR has to do with the display of all this information in the Admin interface. Notably:

amywieliczka commented 8 months ago

Yeah, I tried to find the right level of coupling - I'd like to discuss that further.

barbarahui commented 8 months ago

This looks good to me too. It seems straightforward and flexible in terms of using rikolti_message to pass information to the registry that's not available in the airflow context.

I know Amy has brought up the question of how to handle clearing/rerunning of tasks and how that makes things messy w/r/t to the display in registry. But yeah, that issue just kept on popping into my head as I was thinking this through.

I see what you both mean about the tight coupling of the registry model to the airflow model. Seems like a good thing to discuss sooner rather than later just to make sure we don't bake ourselves into a corner.