PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.34k stars 1.59k forks source link

[Bug] Prefect server web runner flow crashing consistently after 1100-1350 subflow runs - memory leak? #12458

Open wakatara opened 7 months ago

wakatara commented 7 months ago

First check

Bug summary

I have a flow that calls a subflow using the new submit_to_runner feature. It's working great until I get to a certain point and then I have continual crashes around the 1100-1350 files processed mark (regardless of directories or files processed). The starnge thing is it is just the Prefect Server that says its crashed. Individual tasks running with the workers (50 at the same time) work fine. So, assuming it's a memory leak since i think I've ruled out almost everything else. (issue happens both with local dev and on my beefy server - running Prefect via docker compose setup)

Due to the way the subflow needs to work it needs to run syncronously and (I don't believe I can use the aysnc .map mechanic because multiple parameters must be added to each call. Basically, it collects up a list of files and then sends them to the web runner subflow. The subflow calls all the tasks. Basically,

@flow(log_prints=True)
def sci_backend_processing(file; str):
       # SImplified as there are database gets and error checking in here, but you get the idea
        scratch = t.copy_to_scratch(file)
        description = t.describe_fits(file)
        description["scratch_filepath"] = scratch    
        filepath = os.path.normpath(file).split(os.path.sep)
        identity = t.packed_provisional_to_identity(filepath[-2])
        orbit_job_id = t.object_orbit_submit(description["OBJECT"])
        orbit = t.object_orbit(orbit_job_id)
        description = t.flight_checks(description)
        description["OBJECT-ID"] = t.get_object_id("coma-connector", description["OBJECT"])
        calibration = t.calibrate_fits(scratch)
        photometry_type = "APERTURE"
        photom_job_id = t.photometry_fits_submit(scratch, identity, photometry_type)
        photometry = t.photometry_fits(photom_job_id)
        ephem_job_id = t.object_ephemerides_submit(description, orbit)
        ephemerides = t.object_ephemerides(ephem_job_id)
        orbit_coords_id = t.record_orbit_submit(description["OBJECT"], orbit)
        orbit_coords = t.record_orbit(orbit_coords_id)
        t.database_inserts(description, calibration, photometry, ephemerides, orbit_coords)

@flow(log_prints=True)
def atlas_ingest():
    atlas_path = "/data/staging/atlas_test"
    files = t.file_checker_atlas(atlas_path)
    print(f"There are {len(files)} files found to process.")
    submit_to_runner(
        sci_backend_processing, [{"file": file} for file in files]
    )
...

if __name__ == "__main__":
    atlas_ingest.serve(name="atlas_pipeline", webserver=True, limit=51)    

As mentioned, this is working great and processes the science pipeline fine, but then the server process loses its mind around 1100-1350 files every time. (and well, I have ~8M files to process. 8-// ).

image

As mentioned, other than this everything else is working as it should and the sci_backend processes work fine synchronously (the preceding tasks are dependent upon previous results in a stepwise fashion ie. get_jpl_orbit, get_description, etc etc. The processes do have to wait significant amounts of time for results to come back in some cases (one of the apis is throttled to only provide one call at a time from each IP) but as mentioned, this works fantastically and robustly until it gets to about the 1100 files mark. Then the server process crashes and while I've already got the results in the database they're not complete.

(the data is separated into directories by comet/small solar system body and even when I try to only analyze different directories I get the same result.).

It am suspecting a memory leak in the web runner feature (which is awesome, btw ❤️), but having refactored, error corrected, and even swapped out sqlite and postgres to see if there were various things about my setup, I'm a bit stumped as to how to diagnose the problem or what it could be (no logs get printed except for post-crash issues - so even a way to make the logs vastly more verbose would help though, as I said I suspect a memory leak issue.).

I'm also open to if there are other methods to make this work besides the web runner feature.

Reproduction

Submitting a trivial example of the above flow with simple processing (rather htan `time.sleep(x)` ) should yield a similar result - at least if it is making api calls.

Error

No error reporting in the logs. Crash happens in UI.

Versions

2.16.x

Additional context

This is porting an existing flow from Airflow which does work there in DAG format, so I am suspecting it's a Prefect issue.

Huge props to @zzstoatzz on the #prefect-community Slack who provided some guidance on this issue and suggested filing a GH ticket.

zzstoatzz commented 7 months ago

hi @wakatara - thank you for making this issue and trying out this feature!

If you have the opportunity, an MRE that doesn't involve your domain specific code would be helpful. If not, I can work on reproducing / debugging this when time allows

wakatara commented 7 months ago

Hey @zzstoatzz, Lemme try and put something together. The issue is we're calling a lot of api endpoints we built (and then also wrap nasa apis) so I will try to come up with a MRE.

Also, please let me know how I can help diagnose this better if you don't have the time. I noticed someone in the #prefect-community slack had used a memory profiler in-code to demo a memory leak in another area, but I imagine this is what this is (it's far too consistent across worker numbers, diff directories, and times of day and week.).

(and once again, thanks for your help on getting it this far. I'm actually super excited to just puzzle out the crash because this thing is basically one after that is sorted... ).

wakatara commented 7 months ago

@zzstoatzz I guess the other question is, is there another way to achieve this same result besides the webrunners? (which I have to admit because they are easily handled via docker-compose and work intuitively, at least imho.)

Effectively, I want to scale up a single pipeline which I'm not sure can be handled by .map() in an aysnc fashion due to the need to pass multiple prior values (and unsure of the mechanism that map would use to keep those all straight in that case from the subflow. (so, this may just be my unfamiliarity with Prefect conventions, but does map keep those things in order if based on an initial filename (or filenames passed)?

So, in the below example, the task photometry_fit_submit is not async (tho probs easy to be).

ie.

photom_job_id = task.photometry_fits_submit(scratch, identity, photometry_type)
# the job is processing here and can take anywhere variable between 15s and 90s
# the job is checked by lots of retries and retry backoff jitter
photometry = t.photometry_fits(photom_job_id)

Thinking of our out-of-band chat on the async example you showed.

bellmatthewf commented 6 months ago

Set the flow parameter cache_results_in_memory to False. This will fix 90% of the issue. Although I do still think there is a memory leak

wakatara commented 5 months ago

@bellmatthewf Oh wow, I jsut saw this message of yours. I had to fall back to Airflow since even with 2.19.0 I did not have this working, but will try again when I get some time to see if I can get this working with that. And yes, there is def a memory leak but need to spend some time figuring out how to profile it. It seems the web runner is now a lower priority infra approach for Prefect (though have to say when it works, it works fantastically... 🥰).

I'm still committed to simplifying our ingest stack though. And do really really like the way Prefect works much better.

bellmatthewf commented 5 months ago

I opened a possibly related issue (#12668), including minimum example to reproduce. That may help you!

wakatara commented 5 months ago

@bellmatthewf OMG, this look great. Thank you. Watching both issues now.