I am running a task within a task worker, and then invoking that task for a separate flow worker using task.delay(). Even though I have turned storage on, I never see the task being executed from cache. I have tried different cache settings to no avail.
Here is the task worker:
from prefect.task_worker import serve
from prefect import task
from prefect.cache_policies import INPUTS
@task(cache_policy=INPUTS)
def simple(msg:str)->str:
print(f'simple {msg}')
return msg
if __name__ == "__main__":
serve(simple)
And here is the flow worker. It invokes the task twice so the second time should have been from cache, but it is always run.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from task_worker import simple
@flow(log_prints=True, task_runner=ThreadPoolTaskRunner)
def flow_with_task_worker():
f = simple.delay('foo')
f.wait()
print(f'simple {f.result()}')
f = simple.delay('foo')
f.wait()
print(f'simple {f.result()}')
if __name__ == "__main__":
flow_with_task_worker.serve(name="flow_with_task_worker",
tags=["test"],
parameters={})
The environment settings I am using on both the task and flow workers:
Version: 3.0.1
API version: 0.8.4
Python version: 3.12.6
Git commit: c6b2ffe1
Built: Fri, Sep 6, 2024 10:05 AM
OS/Arch: linux/x86_64
Profile: ephemeral
Server type: ephemeral
Pydantic version: 2.9.1
Server:
Database: postgresql
(Python version on the client side is 3.10.14)
Bug summary
I am running a task within a task worker, and then invoking that task for a separate flow worker using task.delay(). Even though I have turned storage on, I never see the task being executed from cache. I have tried different cache settings to no avail.
Here is the task worker:
And here is the flow worker. It invokes the task twice so the second time should have been from cache, but it is always run.
The environment settings I am using on both the task and flow workers:
Version info (
prefect version
output)Additional context
No response