PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.66k stars 1.65k forks source link

`refresh_cache` does not reset `cache_expiration` #10344

Closed mgsnuno closed 1 year ago

mgsnuno commented 1 year ago

First check

Bug summary

refresh_cache does not reset cache_expiration, just updates the value in cache.

Question: how to reset or change cache_expiration?

Use case 1: if cache_expiration is not provided the first time around, or we changed our minds about the cache_expiration, be able to change it;

Use case 3: have taks with 24h cache_expiration, but if desired to refresh the flow during the day, the cache_expiration will be reset to the current runtime, correctly reflecting that for the coming 24h the cache will be valid;

Currently once set for a given task, it is impossible to change it, which restricts the use of caching and also doesn't allow a smooth migration from prefect 1.

Reproduction

import time

import pendulum
from prefect import flow, task
from prefect.tasks import task_input_hash

runs = 0

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=pendulum.duration(seconds=5),
    log_prints=True,
)
def task1():
    global runs
    runs += 1
    print(f"---> run #: {runs}")

@flow
def flow1():
    global runs
    task1()
    assert runs == 1

    time.sleep(4)
    task1()
    assert runs == 1  # cached

    task1.with_options(refresh_cache=True)()
    assert runs == 2  # refresh_cache

    time.sleep(4)  # still inside previous refresh_cache expiration
    task1()
    assert runs == 3  # but not cached

flow1()

Error

No response

Versions

Version:             2.11.1
API version:         0.8.4
Python version:      3.10.12
Git commit:          b42c09f3
Built:               Thu, Jul 27, 2023 2:47 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

WillRaphaelson commented 1 year ago

Thanks for the issue @mgsnuno, I agree that the cache expiration should be reset if the cache is manually reset.

AmanSal1 commented 1 year ago

@WillRaphaelson Can I give it a shot ?

urimandujano commented 1 year ago

Hi @mgsnuno, I've been having some trouble reproducing this issue. Running your example gives me an error that I'm interpreting as the result of a successfully reset cache:

10:52:57.993 | ERROR   | Flow run 'illustrious-muskrat' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/uriel/Projects/github.com/PrefectHQ/prefect/src/prefect/engine.py", line 829, in orchestrate_flow_run
    result = await flow_call.aresult()
  File "/Users/uriel/Projects/github.com/PrefectHQ/prefect/src/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
  File "/Users/uriel/Projects/github.com/PrefectHQ/prefect/src/prefect/_internal/concurrency/calls.py", line 315, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
  File "repro-10344.py", line 39, in flow1
    assert runs == 3, runs  # but not cached
AssertionError: 2

I have my own reproduction which also seems to indicate that refresh_cache is now working as expected -- that when we pass refresh_cache=True the expiration counter is reset to start from the time the cache was refreshed:

import datetime
import time

import pendulum

from prefect import flow, task
from prefect.tasks import task_input_hash

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=pendulum.duration(seconds=10),
    log_prints=True,
)
def task2():
    global runs
    runs += 1
    print(f"---> run #: {runs}")

@flow
def flow2():
    global runs

    for _ in range(4):
        task2()
        assert runs == 1
        time.sleep(1)

    task2.with_options(
        refresh_cache=True
    )()  #  this should cause results to be cached for the next 10 seconds
    assert runs == 2

    cache_valid_until = datetime.datetime.now() + datetime.timedelta(seconds=9)
    while cache_valid_until > datetime.datetime.now():
        task2()
        assert runs == 2
        time.sleep(1)

The versions I'm testing with are:

Version:             2.13.0+0.g66577cc64.dirty
API version:         0.8.4
Python version:      3.8.17
Git commit:          66577cc6
Built:               Thu, Sep 7, 2023 3:52 PM
OS/Arch:             darwin/arm64
Profile:             cloud-personal
Server type:         cloud

Can you please verify that this is still an issue you're experiencing on a more recent version? It may be that this has been corrected in a recent release.

mgsnuno commented 1 year ago

indeed my original example is not working as it was, even when I revert back to the version I was using. I'll report more on this soon.

urimandujano commented 1 year ago

I'm going to close this issue but feel free to open a new one if you notice this happening again.