sul-dlss / libsys-airflow

Airflow DAGS for migrating and managing ILS data into FOLIO along with other LibSys workflows
Apache License 2.0
5 stars 0 forks source link

Fail dag run if any files failed transmission #899

Closed shelleydoljack closed 2 months ago

shelleydoljack commented 4 months ago

Fail the dag run by raising an exception in another downstream task that takes the failed files list. https://github.com/sul-dlss/libsys-airflow/blob/c7f21db2e62827532fa4aca9c2f41fea6f77de8f/libsys_airflow/plugins/data_exports/transmission_tasks.py#L38

shelleydoljack commented 2 months ago

DAG should retry the failed ones or if that doesn't work, send an email or investigate how to best retry the failures.

shelleydoljack commented 2 months ago

Also, for the send_all_records DAG, we need to be able to retry sending files that failed. Successful files are not archived, so a daily running of this DAG would not retry the failed files but rather send all the files again. I don't think we want that.

jermnelson commented 2 months ago

Hi @shelleydoljack, you can set the number of retries a couple of different ways for task depending if you are using Taskflow or not. If you are using the task decorator, you can pass in a retries parameter, i.e.

@task(retries=2)

Using the PythonOperator is similar, you can pass in a retries parameter i.e.:

transform_marc_fields = PythonOperator(
        task_id="transform_folio_remove_marc_fields",
        python_callable=remove_fields_from_marc_files,
        retries=3,
       op_kwargs={}
)
jgreben commented 2 months ago

Also, for the send_all_records DAG, we need to be able to retry sending files that failed. Successful files are not archived, so a daily running of this DAG would not retry the failed files but rather send all the files again. I don't think we want that.

I didn't think we would be doing a daily run of the send_all_records DAG.

jgreben commented 2 months ago

Wouldn't using the PythonOperator retires param retry sending all of the files (even the ones that were successful)? I think this ticket is about creating a downstream task that will pull out the failures in the { success: [], fails: []} dict and retry only those?

jgreben commented 2 months ago