PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.89k stars 1.56k forks source link

Task input persisted leading to memory not being released (same for output). #10952

Closed sibbiii closed 7 months ago

sibbiii commented 11 months ago

First check

Prefect Version

2.x

Describe the current behavior

Hi,

I really like the design of Prefect. To me Prefect is close to perfect. There is just the issue that passing large sets of data to and from tasks quickly eats up all memory. This eaten up memory cannot be released, or at least, I could not find any option to do so. Actually, being able to pass data into tasks is one of the advertised advantages of Prefect over Airflow:

from prefect import flow, task
import os
import psutil
import sys
import gc

@task(persist_result=False, cache_result_in_memory=False)  # <----- Remove this line, and the memory is released -----
def my_task(df):
    pass

@flow
def my_sub_flow_1():

    print(f"Memory before: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")
    df = bytearray(1024*1024*1024)  # 1024MiB of memory

    my_task(df)

    print(f'{sys.getrefcount(df)} references to df')
    del df  # release memory
    gc.collect()  # garbage collection not needed, just be certain
    print(f"Memory after: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB")

my_sub_flow_1()

output is:

Memory before: 163MiB
4 references to df
Memory after: 1187MiB

then removing the @task decorator, all memory gets released by the del and everything works fine. Of course, this is a minimal example. In real live, data engineers like me want to run flows with lots of sub_flows passing large pandas data frames around without running out of memory.

Describe the proposed behavior

It would be a really great enhancement if we could run sub_flows like the one below with large dataframes without running out of memory quickly once having more subflows or tasks.

@flow(empty_cache_after_flowrun=True):  # <----- something like this here
def my_sub_flow_1():
    df=task_load_data1()
    df=task_modify_data(df)
    task_upload_data(df, release_)  

    prefect.release_task_cache  # <----- or something like this here

    df=task_load_data2()
    df=task_modify_data(df)
    task_upload_data(df)    

Example Use

see above. Currently, all big objects passed from one task to another need to go via disk or any other store to be able to release memory. This somehow defeats the great idea of Prefect.

Additional context

Debugging showed that the class class UnpersistedResult(BaseResult) persists the outputs of task 'forever' by design in self._cache. Surprisingly inputs are also persisted somewhere. I spend quite some time debugging all the options, reading documentation and googling. In fact, the documentation is to me misleading as it suggests that things can be non-persistent. persist_result=False, cache_result_in_memory=False are not of help either, especially not to the problem of input persistence.

I also tried to modify the source to release the self._cache after it has been read to contribute, but sadly, I did not find a non destructive way to solve the issue to help also others that might be affected. Actually, I am wondering that I am the only one having this issue.

Ps.: Other options I found are not using tasks at all so that no data is passed around. I also experimented with using a wrapper classes to encapsulate each object passed around and then deleting the large object from the class (leaving the class itself in memory), but that somehow defeats the simplicity of prefect.

jakekaplan commented 11 months ago

Hi @sibbiii, thanks for the well written issue and investigation. I can reproduce the behavior with your MRE.

I think it is unlikely that the references are maintained as part of results as you're not returning anything from your function.

I think this likely has to do with the dependency tracking or as your said input persistence. prefect keeps tracks of inputs as a way to track/resolve dependencies between tasks. We already provide some mechanisms like quote() to opt out of dependency tracking. This is possibly a bug where a reference is maintained when it shouldn't be OR a an enhancement. Either way it seems like it makes sense to at least have an opt out.

sibbiii commented 11 months ago

Hi,

I think it is unlikely that the references are maintained as part of results as you're not returning anything from your function.

You are right, I checked, they are not maintained in UnpersistedResult.

Thereby invested some time to create another minimal example how to fill up memory by calling a task with some input data. When executing the code below, memory usage (e.g. open the task manager) increases with time.

from prefect import flow, task
import gc

@task  # <----- Remove this line, and the memory is released -----
def my_task(some_large_object):
    pass

@flow
def my_sub_flow():
    my_task(bytearray(1024 * 1024 * 300))  # 300MiB

for i in range(100):
    my_sub_flow()
    gc.collect()

I did not find out yet why the memory cannot be released if the task decorator is added. Interestingly, calling a task in a loop (similar to the example below) does not fill up the memory.

For the output of a task, filling up the memory is easy to reproduce:

from prefect import flow, task

@task  # <----- Remove this line, and the memory is released -----
def my_task():
    return bytearray(1024*1024*300)  # 300MiB of memory

@flow
def my_sub_flow():
    for i in range(100):
        my_task()

my_sub_flow()

Here a reference to the data is kept in self._cache of UnpersistedResult.

Unfortunately, not being able to release the memory for inputs and outputs does result in memory issues when running flows with many tasks or lots of data being transferred to and from tasks. I hope there are other ways to keeping track of dependencies between tasks rather than "storing" all input data by keeping the reference, same for the outputs.

zachtrong commented 11 months ago

I think these issues highly impact long running flows/tasks. Where memory is slowly crippling in and got OutOfMemory.

My use case is http calls to a website with hundreds of thousands of sitemaps.

chrisguidry commented 10 months ago

I've been investigating this memory leak and one issue I've found with the parameter case is that after garbage collection, the chain of referrers looks like:

I think what's happening here is that even when the flow and tasks have both finished running, there are lingering references to them in the service that streams logs back to the Prefect API, so I'm going to try to figure out how we can break those references.

One possible solution is that the Call object can probably drop its args/kwargs immediately after it has finished running, as they shouldn't be used after that. Need to experiment some more....

chrisguidry commented 10 months ago

For a task that returns a large value, it's also the Call, but this time it's the Call.future that's holding a long-term reference to the result of the call. It's a bit trickier to know when we're done with the future of a Call since a caller may theoretically call for the result multiple times. Looking to see if there's a way we can just drop the Call object itself when we're done with it...

chrisguidry commented 10 months ago

In the case of task return values, at the end of a flow run there are three references:

1: a Call object's .future <-- this is something we may be able to address 2: an UnpersistedResult in the SequentialTaskRunner._results cache of State results 3: another copy of an UnpersistedResult referenced by the list of State results in the Future._result of the future for create_then_begin_flow_run

That first one seems like something we may be able to address, but the second/third start to interfere with how our result processing works, which might need to be opened as a separate issue. I do believe I can address the parameter issue by breaking the Call's reference to the task's args/kwargs, which should help with this issue about task inputs

chrisguidry commented 10 months ago

Hi @sibbiii, I just merged a fix that will be in the next release, but if you could give it a try on prefect@main, that would be amazing! I was able to address the parameter issue as you described in the original writeup, but the result issue is a bit more invasive of an issue. I'll bring this back to the team to talk about how we might improve the performance/memory use around result caching between flow runs.

sibbiii commented 10 months ago

Hi @sibbiii, I just merged a fix that will be in the next release, but if you could give it a try on prefect@main, that would be amazing!

Hi @chrisguidry,

I imported main head (2.14.4+12.g6d7acc78b9) and reran the example at the beginning of this issue. The output is still the same, that is, memory is not released if the @task decorator is in place and is released if the decorator is removed.

Sebastian

chrisguidry commented 10 months ago

Ah yes, @sibbiii, that is likely to still be the case during the flow run. My fix corrected an issue where memory wasn't freed between flow runs, but it wouldn't have addressed freeing memory during a flow run. Check out this example for what my change fixed:

import gc
import os
import sys

import psutil
from prefect import flow, task

@task(
    persist_result=False, cache_result_in_memory=False
)  # <----- Remove this line, and the memory is released -----
def my_task(df):
    pass

@flow
def my_sub_flow_1():
    print(
        f"Memory before task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    df = bytearray(1024 * 1024 * 1024)  # 1024MiB of memory

    my_task(df)

    print(f"{sys.getrefcount(df)} references to df")
    del df  # release memory
    gc.collect()  # garbage collection not needed, just be certain
    print(
        f"Memory after task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )

if __name__ == "__main__":
    print(
        f"Memory before flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )

    my_sub_flow_1()

    gc.collect()  # garbage collection not needed, just be certain
    print(
        f"Memory after flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )

Before the fix:

Memory before flow: 80.4453125MiB
...
Memory before task: 136.1328125MiB
09:26:15.670 | INFO    | Flow run 'spiked-ostrich' - Created task run 'my_task-0' for task 'my_task'
09:26:15.671 | INFO    | Flow run 'spiked-ostrich' - Executing 'my_task-0' immediately...
09:26:16.353 | INFO    | Task run 'my_task-0' - Finished in state Completed()
4 references to df
Memory after task: 1162.3828125MiB
09:26:16.566 | INFO    | Flow run 'spiked-ostrich' - Finished in state Completed('All states completed.')
Memory after flow: 1163.3203125MiB

After the fix:

Memory before flow: 84.30078125MiB
...
Memory before task: 99.55078125MiB
09:21:21.617 | INFO    | Flow run 'uncovered-trogon' - Created task run 'my_task-0' for task 'my_task'
09:21:21.618 | INFO    | Flow run 'uncovered-trogon' - Executing 'my_task-0' immediately...
09:21:22.285 | INFO    | Task run 'my_task-0' - Finished in state Completed()
3 references to df
Memory after task: 1165.80078125MiB <---- what you're observing
09:21:22.531 | INFO    | Flow run 'uncovered-trogon' - Finished in state Completed('All states completed.')
Memory after flow: 141.91796875MiB <---- what I was able to fix in the first pass

I'm going to re-open this for further investigation.

sibbiii commented 10 months ago

Hi @chrisguidry,

I checked: you are right. It works when the flow is finished.

By the way, now that the input issue is fixed, there is a hack to release memory using something like this:

class SelfDestruct:

    class AlreadyDestructedException(Exception):
        pass

    class NoObjectStored:
        pass  # Singleton for defining that there is no object in the stored

    def __init__(self, obj):
        self._obj_store = obj

    def get_destruct(self, destruct=True):
        if self._obj_store is self.NoObjectStored:
            raise self.AlreadyDestructedException

        result = self._obj_store
        if destruct:
            self._obj_store = self.NoObjectStored
        return result

and then

@task()  
def my_task(df):
    return myHelper.SelfDestruct(df)

@flow
def my_sub_flow():
    df = ...
    result = my_task(df).get_destruct()

which actually works fine, that is, the memory is released. It just does not look like "clean code". Still I could actually add this logic to the @task and @flow decorators ...

Other than this, I had no smart Idea so far how to fix the issue.

chrisguidry commented 10 months ago

We've been discussing internally how we may need to take a fresh look at how results are handled across the board, with memory efficiency in mind. Thanks for your help diagnosing these, we'll keep working on this and keep you posted.

itay-verkh-lightricks commented 8 months ago

Hi, I am running on the newest prefect version (2.14.15). I am still experiencing the same memory leak even after upgrading the prefect version. I turned storage persistence and memory cache off for the task. Here is a reproducible piece of code that causes the memory leak.

@flow
async def periodic_batch_flow(until: Optional[str] = None):
    for i in range(3):
        print(f"Memory before flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB")
        await subflow()
        print(f"Memory after flow: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB")

@task(cache_result_in_memory=False, persist_result=False)
async def some_heavy_task(df):
    # some processing and writing to db
    await asyncio.sleep(10)
    return

@flow
async def subflow():
    print(
        f"Memory before task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    df = pd.DataFrame(np.random.randint(0, 100, size=(int(2.5e7), 4)), columns=list("ABCD"))
    await some_heavy_task(df)

    del df  # doesn't actually do anything
    gc.collect()  # doesn't do anything
    print(
        f"Memory after task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024*1024)}MiB"
    )
    return

And the output after one iteration of the loop: """ 14:29:43.012 | INFO | prefect.engine - Created flow run 'pastel-perch' for flow 'periodic-batch-flow' Memory before flow: 287.171875MiB 14:29:43.151 | INFO | Flow run 'pastel-perch' - Created subflow run 'radiant-loon' for flow 'subflow' Memory before task: 287.9375MiB 14:29:43.843 | INFO | Flow run 'radiant-loon' - Created task run 'some_heavy_task-0' for task 'some_heavy_task' 14:29:43.844 | INFO | Flow run 'radiant-loon' - Executing 'some_heavy_task-0' immediately... 14:29:53.909 | INFO | Task run 'some_heavy_task-0' - Finished in state Completed() Memory after task: 1051.625MiB 14:29:54.012 | INFO | Flow run 'radiant-loon' - Finished in state Completed('All states completed.') Memory after flow: 1051.890625MiB """

As you can see, the memory doesn't go down even after the subflow finishes. If I continue with the loop to more iterations, the memory just grows and grows. If I try to run @chrisguidry 's flow and task, I see the same results that he posted after the bugfix got merged, the only difference I see between his flow and mine are that mine is async.

abrookins commented 7 months ago

Just wanted to drop a note here to say we are actively investigating this. We've narrowed down the area of interest to our use of an anyio cancellation scope using a slightly simplified version of the example code in Itay's comment.

Here's the code I'm using with all of my debugging annotations:

import asyncio
import gc
import os
import time
from typing import Optional

import numpy as np
import objgraph
import pandas as pd
import prefect._internal.concurrency.calls
import psutil
from anyio._backends._asyncio import _task_states
from prefect import flow, task

@task(cache_result_in_memory=False, persist_result=False)
async def some_heavy_task(df):
    # some processing and writing to db
    await asyncio.sleep(10)
    return

@task(cache_result_in_memory=False, persist_result=False)
def some_heavy_task_sync(df):
    # some processing and writing to db
    time.sleep(10)
    return

@flow
async def periodic_batch_flow_tasks(until: Optional[str] = None):
    for i in range(4):
        print(
            "task refs before iteration ",
            prefect._internal.concurrency.calls._ASYNC_TASK_REFS,
        )
        print(
            f"Memory before task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB"
        )
        df = pd.DataFrame(
            np.random.randint(0, 100, size=(int(2.5e7), 4)), columns=list("ABCD")
        )
        some_heavy_task_sync(df)
        del df  # doesn't actually do anything
        gc.collect()  # doesn't do anything

        print(
            f"Memory after task: {psutil.Process(os.getpid()).memory_info()[0] / float(1024 * 1024)}MiB"
        )
        print(
            f"task refs after iteration {i}",
            prefect._internal.concurrency.calls._ASYNC_TASK_REFS,
        )
        print(
            f"task states after iteration {i}", {k: v for k, v in _task_states.items()}
        )

    await asyncio.sleep(5)

    print(
        "task refs at end of loop ",
        prefect._internal.concurrency.calls._ASYNC_TASK_REFS,
    )
    print("task states at end of loop ", {k: v for k, v in _task_states.items()})

    print("Printing dataframes within flow run")
    for i, obj in enumerate(objgraph.by_type("DataFrame")):
        objgraph.show_chain(
            objgraph.find_backref_chain(obj, objgraph.is_proper_module),
            filename=f"chain{i}.png",
        )

if __name__ == "__main__":
    asyncio.run(periodic_batch_flow_tasks())

Profiling with memray shows that if an async task is used, the Python process keeps references to all DataFrames that the flow creates. If a sync task is used, only one reference exists when we exit the loop.

Here's the profiling command I'm using:

$ memray run -o output.bin memory_leak.py
$ memray flamegraph output.bin
$ open memray-flamegraph-output.html
abrookins commented 7 months ago

In #12019, I believe I've isolated two different memory leaks involved in this issue. I'll need to test more and get some confirmation from another machine setup, but this looks promising.

abrookins commented 7 months ago

Here's a memray snapshot showing heap memory stable with the example flow from Itay using #12019. Previously, heap grew on each iteration.

Screenshot 2024-02-16 at 4 29 12 PM