PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.59k stars 1.65k forks source link

cache_key_fn not used in the redis integration #16078

Closed gigaverse-oz closed 3 days ago

gigaverse-oz commented 5 days ago

Bug summary

When using result_storage with RedisDatabase the cache_key_fn is not being used.

Usage example

@task(
    result_storage=persistance_block_redis,
    cache_expiration=timedelta(minutes=10),
    cache_key_fn=define_cache_key,
)

Redis key example for that implementation: flow_name='test_2_flow_81' flow_run_time=10 flow_expected_output='output2'_sub_1

Version info

Version:             3.1.4
API version:         0.8.4
Python version:      3.11.6
Git commit:          78ee41cb
Built:               Wed, Nov 20, 2024 7:37 PM
OS/Arch:             linux/x86_64
Profile:             local
Server type:         server
Pydantic version:    2.10.0b2
Integrations:
  prefect-redis:     0.2.1
  prefect-kubernetes: 0.5.0

Additional context

Used local redis-stack in docker and local prefect server.

zzstoatzz commented 5 days ago

hi @gigaverse-oz - thank you for the issue!

can you provide a minimal example showing the cache_key_fn not being respected? this example appears to work for me

```python » prefect version Version: 3.1.4 API version: 0.8.4 Python version: 3.12.7 Git commit: 78ee41cb Built: Wed, Nov 20, 2024 7:37 PM OS/Arch: darwin/arm64 Profile: pong Server type: cloud Pydantic version: 2.10.0 Integrations: prefect-redis: 0.2.1 ``` ```python # /// script # dependencies = ["prefect-redis"] # /// from prefect_redis import RedisDatabase from prefect import flow, task, unmapped from prefect.cache_policies import RUN_ID, CacheKeyFnPolicy from prefect.context import TaskRunContext # requires: docker run -d --rm -p 6379:6379 redis redis_storage = RedisDatabase(host="localhost", port=6379, db=0) redis_storage.save("my-redis-storage", overwrite=True) def some_cache_key_fn(context: TaskRunContext, parameters: dict): print(f"CACHE KEY FN got {parameters.values()=}") return parameters["kwarg_that_matters"] @task( task_run_name="Some task with {kwarg_that_matters}", result_storage=redis_storage, cache_policy=CacheKeyFnPolicy(cache_key_fn=some_cache_key_fn) + RUN_ID, ) def some_task(kwarg_that_matters: str, kwarg_that_doesnt_matter: str): return f"I ran with {kwarg_that_matters} and {kwarg_that_doesnt_matter}" @flow(log_prints=True) def some_flow(): some_task.map( kwarg_that_matters=["foo", "baz"], kwarg_that_doesnt_matter=unmapped("bar"), ).result() some_task.map( kwarg_that_matters=["foo", "baz"], kwarg_that_doesnt_matter=unmapped("bar"), ).result() if __name__ == "__main__": some_flow() ``` ```python » uv run repros/16078.py Reading inline script metadata from `repros/16078.py` 16:38:06.091 | INFO | prefect.engine - Created flow run 'primitive-stoat' for flow 'some-flow' 16:38:06.785 | INFO | Task run 'Some task with foo' - CACHE KEY FN got parameters.values()=dict_values(['foo', 'bar']) 16:38:06.804 | INFO | Task run 'Some task with baz' - CACHE KEY FN got parameters.values()=dict_values(['baz', 'bar']) 16:38:06.815 | INFO | Task run 'Some task with foo' - Finished in state Completed() 16:38:06.816 | INFO | Task run 'Some task with baz' - Finished in state Completed() 16:38:06.872 | INFO | Task run 'Some task with foo' - CACHE KEY FN got parameters.values()=dict_values(['foo', 'bar']) 16:38:06.883 | INFO | Task run 'Some task with baz' - CACHE KEY FN got parameters.values()=dict_values(['baz', 'bar']) 16:38:06.889 | INFO | Task run 'Some task with baz' - Finished in state Cached(type=COMPLETED) 16:38:06.889 | INFO | Task run 'Some task with foo' - Finished in state Cached(type=COMPLETED) 16:38:07.163 | INFO | Flow run 'primitive-stoat' - Finished in state Completed() ```
gigaverse-oz commented 3 days ago

@zzstoatzz

I'm sorry, I had a mistake and it works fine. Passed the wrong object as key which confused me a lot.

zzstoatzz commented 3 days ago

all good @gigaverse-oz, thanks for the update!