PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
16.15k stars 1.57k forks source link

Add result option to not persist data in memory and fall back on result interface read/write #3373

Closed marvin-robot closed 2 years ago

marvin-robot commented 4 years ago

Opened from the Prefect Public Slack Community

adriaan: Hi, I was wondering if prefect always keep its task results values in memory during whole flow run or if gets removed from memory when enabling writing to a LocalResults target. As an example, what happens in the following task?

    jsonresult = task(
        get_json_fun,
        result=LocalResult(serializer=JSONSerializer()),
        target='/path/result.json',
        checkpoint=True,
    )(p)

Is jsonresult still in memory next to being written to result.json or not?

I'm worried that my machine will get out of memory when my flow get's bigger with many task results floating around.

josh: Hi <@U015Q3FN17A> how are you running your flow? If the process your flow is running in is still alive then I believe it should still exist in memory (e.g. running in ipython or a notebook or something) as well as written to a local file. If you are running this flow through the use of a Prefect agent then the result should only be persisted to a local file location and should not be stored in memory

adriaan: Hi thanks for answering I run the flow locally. without the server part.

There is no way to only let it persist as a local file during the flow run this way? What with flows that produce many sizeable result objects? The memory of you machine can run out pretty fast I would think.

josh: This is more of a python question than prefect because it depends on what you are doing. If you have a notebook that does something like this:

final1 = flow.run()
final2 = flow.run()
final3 = flow.run()

Then as long as that process is alive all three of those states w/ task results will exist in memory.

If you run your flow as a script like:

python my_flow_file.py

Then the results that were in memory will no longer exist in memory because the process ended.

In both cases the results will still be written to a local file using the LocalResult.

adriaan: I understand this. My problem is what happens if you run out of memory during flow.run().

Say for example you are processing many( eg millions) of txt files.

-task1 produces a list of paths to these files -task2 maps over the output of task1 and calculate some intermediate result for each results (eg. feature extraction) -task3 does again something else with the output of task3 -etc...

I can imagine the your machine will run out of memory during step 2. I understand you can write the intermediate json results to disk yourself als part of task2 and read it back in during task3. But I was hoping I could keep my functions in the Tasks pure and abstract disk I/O away as part of the prefect flow.

I hope I could make myself clear enough :slightly_smiling_face:

josh: Ah I see what you’re saying, thanks for clarifying! Yeah running out of memory during a run is definitely possible (especially when running on a single machine) and has the potential to run into weird issues due to resource constraints. If you’re performing any memory intensive tasks like this and you’re worried about hitting limits I would recommend looking into using prefect’s native <https://docs.prefect.io/core/advanced_tutorials/dask-cluster.html|dask integration> which will let you farm out your tasks execution to workers on a dask cluster. This allows you to spin up / work with dask clusters that can be distributed across many machines with various amounts of memory

adriaan: Hey, Yes I understand the benifits of parallellisation with dask. But I'm not sure it covers completely what I was trying to get. My example was probably sufficient. Consider the following example -task1 processes the input in generates a fairly large output project (eg. a pandas dataframe, or big numpy matrix) that is 100's of mb. -task2 does some manipulation of the task 1 result and produces another sizeable object -task4 does some manipulation of the task 2 result and produces another sizeable object -etc

each results fits easily in memory but if you have many such tasks in your flow you will run out of memory fast. In this case it would be favorable to serialize the intermediate task results at each step to disk and out of memory while the flow is running. I don't think Dask can solve this issue.

josh: If we had a way to opt into something where a user could say “Store this task result but don’t keep in memory then have downstream task load from result location” I think this would resolve exactly what you’re after wouldn’t it? :thinking_face:

adriaan: Yes, I think it would. :slightly_smiling_face:

adriaan: I guess, it would make sense as an option in the result class or result handlers

josh: Would you mind opening a feature request on the repo for this? I don’t think there is a pure prefect way of doing this (could be missing something) but I think it could be implemented! As a workaround you could also not use prefect’s result interface and go with something more manual in your tasks for the time being :slightly_smiling_face:

@task
def a():
  d = get_data(...)
  location = write_to_file(d)
  return location

@task
def b(location):
  d = load_from_file(location)
  ...

with Flow(...) as f:
  location = a()
  b(location)

^ this only passes around the location of the data

adriaan: Yes you are right, I could do this. But I was hoping to keep my functions pure of I/O and let prefect handle this automatically. If multiple tasks dependent on another or have multiple inputs, it can get unwieldy fast and your code gets cluttered with many read and write functions. (Or I am just being lazy :stuck_out_tongue: )

I look into opening a feature request

adriaan: (btw is there a way to reference this slack thread in a github issue?)

josh: Ah yeah let me open an issue directly from this actually

josh: <@ULVA73B9P> open “Add result option to not persist data in memory and fall back on result interface read/write”

Original thread can be found here.

adder commented 4 years ago

Hi, To summarize: My concern is that a prefect flow with many (sequential) tasks producing many sizeable Results (eg. big pandas data frames, numpy arrays etc) could fill up the available memory quickly because all results stay in memory as long as the prefect python flow is active.

A potential solution is to have an option to let the result of the task be written to a specified task target with a specified serializer (as is already possible today) and then also remove the result from memory.

yin-aven commented 2 years ago

We ran into this problem with big pandas dataframes. To speed up processing, we use dask executor and mapping. As each mapped task returns a dataframe, memory runs out very quickly.

I hope there is a solution for this, or Prefect is stuck with "toy" examples. :)

Julian-Brendel commented 2 years ago

@cicdw was this completed in prefect 2? Also interested in optionally relying on persisted state instead of memory

cicdw commented 2 years ago

Hi @Julian-Brendel great timing - we are going to be working on 2.0's result persistence interface starting in the next week or two! Ensuring Python garbage collects an object is not always possible so I don't want to overpromise what we'll be able to achieve, but I'll be sure to call out this issue in our design docs.