PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.05k stars 1.57k forks source link

Serializer memory build up when using result persistence #7624

Closed lennertvandevelde closed 1 year ago

lennertvandevelde commented 1 year ago

First check

Bug summary

I have configured result persistence on a task that fetches millions of records from an API, to prevent a large buildup of memory as by documentation. The results are in fact persisted, but the serialization objects from each object remain in memory, just leading to a shift in the buildup.

Reproduction

from prefect import task, flow, get_run_logger
from prefect.filesystems import LocalFileSystem

import tracemalloc

@task(cache_result_in_memory=False, persist_result=True, result_storage=LocalFileSystem(basepath='/home/lennert/prefect_test/storage'))
def task_example(parameter):
    longlist = ["LOTSOFDATA"]*1000000
    return longlist

@flow(name="prefect_flow_test")
def main_flow():
    """
    Here you write your main flow code and call your tasks and/or subflows.
    """
    def snapshot():
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')
        logger.warning("-----------------")
        for stat in top_stats[:2]:
            print(stat)
            logger.warning(stat)
            logger.warning(stat.traceback.format())

    tracemalloc.start(10)
    logger = get_run_logger()

    for i in range(100):
        task_example(i)
        snapshot()

if __name__ == "__main__":
    main_flow()

Error

Logs (outputs of tracemalloc):

16:17:13.132 | WARNING | Flow run 'malachite-fulmar' - -----------------
16:17:13.133 | WARNING | Flow run 'malachite-fulmar' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=4, average=2024 KiB
16:17:13.135 | WARNING | Flow run 'malachite-fulmar' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
[...]
16:17:16.047 | INFO    | Task run 'task_example-0c90c599-1' - Finished in state Completed()
16:17:16.189 | WARNING | Flow run 'malachite-fulmar' - -----------------
16:17:16.191 | WARNING | Flow run 'malachite-fulmar' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=15.8 MiB, count=6, average=2698 KiB
16:17:16.192 | WARNING | Flow run 'malachite-fulmar' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
[...]
16:17:19.400 | INFO    | Task run 'task_example-0c90c599-2' - Finished in state Completed()

After some task runs: 

16:18:26.512 | WARNING | Flow run 'malachite-fulmar' - -----------------
16:18:26.514 | WARNING | Flow run 'malachite-fulmar' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=182 MiB, count=68, average=2738 KiB
16:18:26.516 | WARNING | Flow run 'malachite-fulmar' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
[...]
16:18:29.761 | INFO    | Task run 'task_example-0c90c599-23' - Finished in state Completed()
16:18:29.911 | WARNING | Flow run 'malachite-fulmar' - -----------------
16:18:29.912 | WARNING | Flow run 'malachite-fulmar' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=190 MiB, count=71, average=2736 KiB
16:18:29.914 | WARNING | Flow run 'malachite-fulmar' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
[...]
16:18:33.117 | INFO    | Task run 'task_example-0c90c599-24' - Finished in state Completed()

Versions

Version:             2.6.8
API version:         0.8.3
Python version:      3.8.9
Git commit:          68044e28
Built:               Thu, Nov 17, 2022 3:19 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

The object buildup comes from:

class PickleSerializer(Serializer):
    def loads(self, blob: bytes) -> Any:
        pickler = from_qualified_name(self.picklelib)
->      return pickler.loads(base64.decodebytes(blob))

pickler.loads(base64.decodebytes(blob))'s size increases steadily with each task run, so I assume memory is not released after loading the persisted results.

I don't know if this is expected behaviour, but if so it does not correspond with the statement made in the documentation here.

zanieb commented 1 year ago

Thanks for the issue! I don't think this is what I intended here, not sure where we need to release an object though.

This may be because you are using task calls instead of submit. When the result of the task is requested, we must retrieve it from storage. In your example, each of those calls is being resolved to a result. I imagine due to the fast for-loop you have there, the results are not being garbage collected by the time you snapshot.

lennertvandevelde commented 1 year ago

Thank you for the fast response!

I've edited the test file to include submits, a sleep and a manual garbage collect.

from time import sleep
from prefect import task, flow, get_run_logger
from prefect.filesystems import LocalFileSystem
import gc

import tracemalloc

@task(cache_result_in_memory=False, persist_result=True, result_storage=LocalFileSystem(basepath='/home/lennert/prefect_test/storage'))
def task_example(parameter):
    longlist = ["LOTSOFDATA"]*1000000
    return longlist

@flow(name="prefect_flow_test", cache_result_in_memory=False, persist_result=True, result_storage=LocalFileSystem(basepath='/home/lennert/prefect_test/storage'))
def main_flow():
    """
    Here you write your main flow code and call your tasks and/or subflows.
    """
    def snapshot():
        snapshot = tracemalloc.take_snapshot()

        top_stats = snapshot.statistics('lineno')
        logger.warning("-----------------")
        for stat in top_stats[:2]:
            logger.warning(stat)
            logger.warning(stat.traceback.format())

    tracemalloc.start(10)
    logger = get_run_logger()

    for i in range(5):
        sleep(10)
        state = task_example.submit(i)
        result = state.result()
        logger.info(len(result))
        snapshot()
        gc.collect()

if __name__ == "__main__":
    main_flow()

But the buildup of the serializer remains:

17:07:17.614 | INFO    | prefect.engine - Created flow run 'bright-sawfish' for flow 'prefect_flow_test'
17:07:28.415 | INFO    | Flow run 'bright-sawfish' - Created task run 'task_example-0c90c599-0' for task 'task_example'
17:07:28.538 | INFO    | Flow run 'bright-sawfish' - Submitted task run 'task_example-0c90c599-0' for execution.
17:07:31.659 | INFO    | Task run 'task_example-0c90c599-0' - Finished in state Completed()
17:07:31.732 | INFO    | Flow run 'bright-sawfish' - 1000000
17:07:31.752 | WARNING | Flow run 'bright-sawfish' - -----------------
17:07:31.754 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:07:31.756 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:07:31.758 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/client/orion.py:1599: size=7528 B, count=35, average=215 B
17:07:31.761 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/client/orion.py", line 1599', '    return TaskRun.parse_obj(response.json())']
17:07:42.758 | INFO    | Flow run 'bright-sawfish' - Created task run 'task_example-0c90c599-1' for task 'task_example'
17:07:42.760 | INFO    | Flow run 'bright-sawfish' - Submitted task run 'task_example-0c90c599-1' for execution.
17:07:46.173 | INFO    | Task run 'task_example-0c90c599-1' - Finished in state Completed()
17:07:46.248 | INFO    | Flow run 'bright-sawfish' - 1000000
17:07:46.275 | WARNING | Flow run 'bright-sawfish' - -----------------
17:07:46.276 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=15.8 MiB, count=6, average=2698 KiB
17:07:46.278 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:07:46.279 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=183 KiB, count=1891, average=99 B
17:07:46.281 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:07:56.764 | INFO    | Flow run 'bright-sawfish' - Created task run 'task_example-0c90c599-2' for task 'task_example'
17:07:56.766 | INFO    | Flow run 'bright-sawfish' - Submitted task run 'task_example-0c90c599-2' for execution.
17:08:00.137 | INFO    | Task run 'task_example-0c90c599-2' - Finished in state Completed()
17:08:00.212 | INFO    | Flow run 'bright-sawfish' - 1000000
17:08:00.237 | WARNING | Flow run 'bright-sawfish' - -----------------
17:08:00.239 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=23.7 MiB, count=9, average=2698 KiB
17:08:00.240 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:08:00.241 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:08:00.243 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:08:10.790 | INFO    | Flow run 'bright-sawfish' - Created task run 'task_example-0c90c599-3' for task 'task_example'
17:08:10.792 | INFO    | Flow run 'bright-sawfish' - Submitted task run 'task_example-0c90c599-3' for execution.
17:08:14.200 | INFO    | Task run 'task_example-0c90c599-3' - Finished in state Completed()
17:08:14.274 | INFO    | Flow run 'bright-sawfish' - 1000000
17:08:14.299 | WARNING | Flow run 'bright-sawfish' - -----------------
17:08:14.301 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=31.6 MiB, count=12, average=2698 KiB
17:08:14.302 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:08:14.304 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:08:14.305 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:08:24.861 | INFO    | Flow run 'bright-sawfish' - Created task run 'task_example-0c90c599-4' for task 'task_example'
17:08:24.863 | INFO    | Flow run 'bright-sawfish' - Submitted task run 'task_example-0c90c599-4' for execution.
17:08:28.283 | INFO    | Task run 'task_example-0c90c599-4' - Finished in state Completed()
17:08:28.365 | INFO    | Flow run 'bright-sawfish' - 1000000
17:08:28.391 | WARNING | Flow run 'bright-sawfish' - -----------------
17:08:28.393 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=39.5 MiB, count=15, average=2698 KiB
17:08:28.395 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:08:28.396 | WARNING | Flow run 'bright-sawfish' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:08:28.398 | WARNING | Flow run 'bright-sawfish' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:08:29.189 | INFO    | Flow run 'bright-sawfish' - Finished in state Completed('All states completed.')
lennertvandevelde commented 1 year ago

I believe the build up is caused by the caching of the object as described [here](https://docs.prefect.io/concepts/results/#caching-of-results-in-memory:~:text=The%20get()%20method%20on%20result%20references%20retrieves%20the%20data%20from%20storage%2C%20deserializes%20it%2C%20and%20returns%20the%20original%20object.%20The%20get()%20operation%20will%20cache%20the%20resolved%20object%20to%20reduce%20the%20overhead%20of%20subsequent%20calls.).

After commenting out line 415 in results.py:

    @sync_compatible
    @inject_client
    async def get(self, client: "OrionClient") -> R:
        """
        Retrieve the data and deserialize it into the original object.
        """
        if self.has_cached_object():
            return self._cache

        blob = await self._read_blob(client=client)
        obj = blob.serializer.loads(blob.data)
->        # self._cache_object(obj)

        return obj

The build up does not happen:

17:19:11.064 | INFO    | prefect.engine - Created flow run 'glossy-beluga' for flow 'prefect_flow_test'
17:19:22.688 | INFO    | Flow run 'glossy-beluga' - Created task run 'task_example-0c90c599-0' for task 'task_example'
17:19:22.800 | INFO    | Flow run 'glossy-beluga' - Submitted task run 'task_example-0c90c599-0' for execution.
17:19:25.917 | INFO    | Task run 'task_example-0c90c599-0' - Finished in state Completed()
17:19:25.998 | INFO    | Flow run 'glossy-beluga' - 1000000
17:19:26.018 | WARNING | Flow run 'glossy-beluga' - -----------------
17:19:26.020 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:19:26.022 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:19:26.023 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/client/orion.py:1599: size=7528 B, count=35, average=215 B
17:19:26.027 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/client/orion.py", line 1599', '    return TaskRun.parse_obj(response.json())']
17:19:37.103 | INFO    | Flow run 'glossy-beluga' - Created task run 'task_example-0c90c599-1' for task 'task_example'
17:19:37.105 | INFO    | Flow run 'glossy-beluga' - Submitted task run 'task_example-0c90c599-1' for execution.
17:19:40.563 | INFO    | Task run 'task_example-0c90c599-1' - Finished in state Completed()
17:19:40.640 | INFO    | Flow run 'glossy-beluga' - 1000000
17:19:40.666 | WARNING | Flow run 'glossy-beluga' - -----------------
17:19:40.667 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:19:40.668 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:19:40.669 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=183 KiB, count=1891, average=99 B
17:19:40.671 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:19:51.521 | INFO    | Flow run 'glossy-beluga' - Created task run 'task_example-0c90c599-2' for task 'task_example'
17:19:51.524 | INFO    | Flow run 'glossy-beluga' - Submitted task run 'task_example-0c90c599-2' for execution.
17:19:54.896 | INFO    | Task run 'task_example-0c90c599-2' - Finished in state Completed()
17:19:54.975 | INFO    | Flow run 'glossy-beluga' - 1000000
17:19:55.002 | WARNING | Flow run 'glossy-beluga' - -----------------
17:19:55.003 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:19:55.005 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:19:55.006 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:19:55.007 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:20:06.192 | INFO    | Flow run 'glossy-beluga' - Created task run 'task_example-0c90c599-3' for task 'task_example'
17:20:06.194 | INFO    | Flow run 'glossy-beluga' - Submitted task run 'task_example-0c90c599-3' for execution.
17:20:09.589 | INFO    | Task run 'task_example-0c90c599-3' - Finished in state Completed()
17:20:09.664 | INFO    | Flow run 'glossy-beluga' - 1000000
17:20:09.691 | WARNING | Flow run 'glossy-beluga' - -----------------
17:20:09.693 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:20:09.694 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:20:09.695 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:20:09.696 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:20:20.107 | INFO    | Flow run 'glossy-beluga' - Created task run 'task_example-0c90c599-4' for task 'task_example'
17:20:20.109 | INFO    | Flow run 'glossy-beluga' - Submitted task run 'task_example-0c90c599-4' for execution.
17:20:23.564 | INFO    | Task run 'task_example-0c90c599-4' - Finished in state Completed()
17:20:23.643 | INFO    | Flow run 'glossy-beluga' - 1000000
17:20:23.672 | WARNING | Flow run 'glossy-beluga' - -----------------
17:20:23.674 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py:147: size=8095 KiB, count=3, average=2698 KiB
17:20:23.675 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/envs/prefect-env/lib/python3.8/site-packages/prefect/serializers.py", line 147', '    return pickler.loads(base64.decodebytes(blob))']
17:20:23.677 | WARNING | Flow run 'glossy-beluga' - /home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py:137: size=197 KiB, count=2039, average=99 B
17:20:23.679 | WARNING | Flow run 'glossy-beluga' - ['  File "/home/lennert/.pyenv/versions/3.8.9/lib/python3.8/linecache.py", line 137', '    lines = fp.readlines()']
17:20:23.798 | INFO    | Flow run 'glossy-beluga' - Finished in state Completed('All states completed.')

Why is the object cached again if the objective of the resulted persistence is to limit cache and by that, memory usage?

zanieb commented 1 year ago

If you do not retrieve the result at result = state.result() then the increase does not occur. There are other objectives to result persistence than reducing memory usage. In fact, that is a secondary goal. I'll see if we can avoid caching the object there. It's a bit tricky because the setting applies at result creation time not retrieval.

zanieb commented 1 year ago

Here's a sketch: https://github.com/PrefectHQ/prefect/pull/7625

lennertvandevelde commented 1 year ago

The draft looks like a good solution to me. I can see that result persistence has more usages than limiting memory usage, but the cache_result_in_memory=False parameter would of course assume this is (one of) the persistence's usage(s) (as is the case in #7625 now I think).