sul-dlss-labs / rialto-airflow

Airflow for harvesting data for research intelligence and open access analysis
Apache License 2.0
1 stars 0 forks source link

Zombie job during merge_publications #81

Closed edsu closed 1 month ago

edsu commented 1 month ago

I noticed that the merge_publications step works fine when using DEV_LIMIT=1000 on my laptop but fails in our production Airflow:

https://sul-rialto-airflow-dev.stanford.edu/dags/harvest/grid?dag_run_id=manual__2024-07-10T11%3A06%3A13.060630%2B00%3A00&tab=logs&task_id=merge_publications

The error is:

[2024-07-12, 15:47:01 UTC] {scheduler_job_runner.py:1737} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/rialto_airflow/dags/harvest.py', 'processor_subdir': '/opt/airflow/rialto_airflow/dags', 'msg': "{'DAG Id': 'harvest', 'Task Id': 'merge_publications', 'Run Id': 'manual__2024-07-10T11:06:13.060630+00:00', 'Hostname': '0b19fcec1ff1', 'External Executor Id': 'ae9582b5-0a58-43ca-8729-b7f721a3be08'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7fc84c2d5850>, 'is_failure_callback': True} (See https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#zombie-undead-tasks)

After retrying a few times I noticed that the server is running out of available memory, which I believe results in the task being killed (either by Airflow, Docker or the VM).

Screenshot 2024-07-12 at 11 46 16 AM

The dimensions and openalex CSVs are quite large:

-rw-rw-r-- 1 rialto root 8.8M Jul 10 09:51 dimensions-doi-orcid.pickle
-rw-rw-r-- 1 rialto root  11G Jul 11 16:52 dimensions-pubs.csv
-rw-rw-r-- 1 rialto root  20M Jul 11 10:50 doi-sunet.pickle
-rw-rw-r-- 1 rialto root  11M Jul 11 10:49 openalex-doi-orcid.pickle
-rw-rw-r-- 1 rialto root  12G Jul 11 18:31 openalex-pubs.csv
-rw-rw-r-- 1 rialto root 2.5G Jul 10 05:07 sulpub.csv

We currently only have 8GB of RAM available. Since we are using polars and streaming it's not exactly clear to me how much RAM we might need. We could try doubling it and see if that helps?

jacobthill commented 1 month ago

That sounds like it would be worth trying.

edsu commented 1 month ago

I put in a request to double the memory 🤞

lwrubel commented 1 month ago

We could also experiment with setting low_memory to True when calling scan_csv https://docs.pola.rs/api/python/stable/reference/api/polars.scan_csv.html

edsu commented 1 month ago

We seem to have resolved this for now by lowering the amount of data we are writing to the merged publication parquet file #90

At some point we may want to revisit why we can't use df.collect(streaming=True).sink_parquet instead of df.collect().write_parquet() since the former should stream the data to the parquet file rather than requiring it to be built in memory?