PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.31k stars 1.5k forks source link

Add flow-level caching #7288

Open anna-geller opened 1 year ago

anna-geller commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

Currently, we only support cache_key_fn on a task decorator.

Some users would like to specify caching on a flow-level

Describe the proposed behavior

Add cache_key_fn to the flow decorator the same way this is handled on a task level

Example Use

"As an ML engineer, I want to use the cached trained model if all inputs are the same"

@flow(cache_key_fn=task_input_hash))
def train_model(filename, hyperparams):
    data = load_data(filename)
    return sklearn_model(data, hyperparams)

Additional context

Actual user request on Discourse

Alternative

We explored the possibility of leveraging idempotency keys for such a use case, but those only work for runs from deployments. Caching would be helpful even (or possibly especially) for locally triggered flow runs during e.g. ML training process experimentation.

zanieb commented 1 year ago

We explored the possibility of leveraging idempotency keys for such a use case, but those only work for runs from deployments.

This should not be the case, idempotency keys apply to all flow runs. We can allow one to be passed to flow calls, it'd make the most sense in a submit interface.

An idempotency key is similar to a cache key, but if a later run matches an existing cache key the run still created it just assumes the cached state. If a later run matches an existing idempotency key, a new run is not created, the old run is returned.

anna-geller commented 1 year ago

based on this description, it looks like the idempotency key is a good solution for the given use case -- how could we expose idempotency keys more prominently? could we expose it as idempotency_key_fn on a flow decorator, similarly to how it's done for caching?

zanieb commented 1 year ago

it looks like the idempotency key is a good solution for the given use case

Do they not want their run to be created still and marked as retrieved from the cache?

how could we expose idempotency keys more prominently? could we expose it as idempotency_key_fn on a flow decorator, similarly to how it's done for caching?

I'm not sure what the best way is yet. I do not think we should accept callables though, idempotency keys should probably be created by the user and passed explicitly. They are a property of the run more than the flow (unlike cache keys).

anna-geller commented 1 year ago

Chatted with both Bill and Michael:

secrettoad commented 1 year ago

Bumping this just to share that it would indeed be quite helpful. Could save hundreds of thousands of calls to the server one just one task for each run of a particular flow of mine...

zanieb commented 1 year ago

@secrettoad I'm not sure I follow why task caching would not solve your problem?

secrettoad commented 1 year ago

@madkinsz One use case is a subflow with 20k mapped tasks whose results are cached in the database requiring 20k calls/interactions with the server and database that take nearly 10 minutes in total, when if the encapsulating flow was cached, it would only require comparison of one cache key and retrieval of one result. I would like to scale this flow to hundreds of thousands of tasks, most of which will be aggressively cached, but if it takes an hour to check cache keys and retrieve cached results, that's not practical.

Perhaps a better solution that would solve a more over-arching issue is the ability to distribute creation and submission of tasks (including checking cache keys and retrieving cache results) outside of the GIL, that way more hardware can solve the problem of scaling to large numbers of cached tasks.

zanieb commented 1 year ago

@secrettoad Thanks for the additional details! Could you use idempotency keys to avoid creating the extra flow run in the first place?

secrettoad commented 1 year ago

I suppose I could, but I'd rather implement my own caching layer than that. Just sharing that it would be valuable to me 🥂

PiotrSiejda commented 1 year ago

Just dropping two not perfect implementations using idempotency key: API (correctly starts just one flow and returns results):

import time

from prefect import task, flow, get_run_logger
from prefect.deployments import Deployment, run_deployment

@flow(persist_result=True)
def subflow_sync_ok():
    get_run_logger().info('Here I am!')
    time.sleep(2)
    return 'done processing'

@task
def my_run_deploy(deployment_name):
    f1 = run_deployment(
        name=deployment_name,
        parameters={},
        flow_run_name='My_run',
        idempotency_key='My_run',
    )
    res = f1.state.result()
    get_run_logger().info(f'Results ready! {res=}')
    return res

@flow
def main_flow_sync_ok(deployment_name):
    get_run_logger().info('Starting subflow! So excited')
    rsp = my_run_deploy.submit(deployment_name)
    rsp2 = my_run_deploy.submit(deployment_name)
    get_run_logger().info(rsp.result())
    get_run_logger().info(rsp2.result())

if __name__ == '__main__':
    dep = Deployment.build_from_flow(subflow_sync_ok, name="test_subfows")
    dep_id = dep.apply()
    main_flow_sync_ok(f'{subflow_sync_ok.name}/test_subfows')

REST (correctly starts just one flow run and radar shows the subflow but I don't know how to return the results):

import time

import requests
from prefect import task, flow, get_run_logger
from prefect.context import TaskRunContext
from prefect.deployments import Deployment
from prefect.settings import PREFECT_UI_API_URL

@flow(persist_result=True)
def subflow():
    get_run_logger().info('Here I am!')
    time.sleep(2)
    return "done processing"

@task()
def run_or_re_run_rest(deployment_id, pars, id_key):
    get_run_logger().info('Starting subflow! So excited')
    rsp = requests.post(
        f'{PREFECT_UI_API_URL.value()}/deployments/{str(deployment_id)}/create_flow_run',
        json={
            "name": id_key, "idempotency_key": id_key,
            "parameters": pars,
            "parent_task_run_id": str(TaskRunContext.get().task_run.id),
            "state": {"type": "SCHEDULED"},
        }
    )
    running = True
    get_run_logger().info(f'rsp {rsp.status_code=}')

    while running:
        flow_run_rsp = requests.get(f'{PREFECT_UI_API_URL.value()}/flow_runs/{str(rsp.json()["id"])}')
        if flow_run_rsp.json()['state_type'] in ("CANCELLED", "FAILED", "CRASHED"):
            raise Exception('')
        elif flow_run_rsp.json()['state_type'] == "COMPLETED":
            get_run_logger().info(f'DONE!')
            return True

@flow
def main_flow_rest(deployment_id):
    run_or_re_run_rest.submit(deployment_id, {}, 'My_run_rest')
    run_or_re_run_rest.submit(deployment_id, {}, 'My_run_rest')

if __name__ == '__main__':
    dep = Deployment.build_from_flow(subflow, name="test_subfows")
    dep_id = dep.apply()
    main_flow_rest(dep_id)
PiotrSiejda commented 1 year ago

And with asyncio:

import asyncio
import random
import time

from prefect import flow, get_run_logger
from prefect.deployments import Deployment, run_deployment

@flow(persist_result=True)
def subflow():
    get_run_logger().info('Here I am!')
    time.sleep(2)
    return f"Done processing with num {random.random()}"

@flow
async def main_flow(deployment_name):
    get_run_logger().info('Starting subflow! So excited')

    f1 = run_deployment(
        name=deployment_name,
        parameters={},
        flow_run_name='My_run',
        idempotency_key='My_run',
    )
    f2 = run_deployment(
        name=deployment_name,
        parameters={},
        flow_run_name='My_run',
        idempotency_key='My_run',
    )
    aa = await asyncio.gather(f1, f2)
    bb = await aa[0].state.result().get()
    cc = await aa[1].state.result().get()
    get_run_logger().info(f'Results passed to the main flow:'
                          f'first subflow: {bb}'
                          f'second subflow: {cc}'
                          )

if __name__ == '__main__':
    dep = Deployment.build_from_flow(subflow, name="test_subfows")
    dep_id = dep.apply()
    asyncio.run(main_flow(f'{subflow.name}/test_subfows'))
takacsg84 commented 1 year ago

Any news regarding this feature? It would be really useful to our use-cases as well.

secrettoad commented 1 year ago

FYI, to my previous comments, I solved this by using the full prefect architecture and deploying agents within a kubernetes cluster. This way, the calls to the server and db are divided up among agents via queues and horizontally scalable that way.

Beautifully done, by the way, to the prefect team. The capabilities gap between prefect and the mainstream tooling is just astounding.

elenavolkova93 commented 1 year ago

+1, would also be useful for us. We have some subflows that are long enough to be inconvenient to squish into one task, but caching intermediate task results doesn't make sense, because they are big, and the only one that we're interested in is the result of the final task of the subflow.

willthbill commented 8 months ago

Whats is the status of this issue?

It would be very useful for my team. I am no expert in prefect, but my thoughts on this are:

  1. Implementation-wise the analysis that prefect does of flows to construct the DAG/dependencies could be done on only the top flow. Then, if a subflow cache is used (meaning the result was previously computed) we use that and do not analyse (i.e. construct the DAG) the subflow. Otherwise, we analyse the subflow and evaluate it.
  2. Having small simple tasks is the ideal situation as I see it -- we want tasks to be the smallest unit of work. However, calling each small task from a flow may take a long time in itself even if the tasks are cached. For example if we have a for loop that starts a million small tasks with different arguments. Thus, this prevents the ideal design of small tasks in my opinion.

Lastly, am I missing something? Does prefect have functionality to handle a situation as described in (2)?

Are there any resources for learning about the inner-workings of prefect, so that I may contribute to this.

nbons commented 2 months ago

Just checking in on the status of this issue again. It would be really helpful to have this feature enabled. Can you give an update on whether this will be available in the near future?