PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.45k stars 1.64k forks source link

Result storage configuration must be persisted server-side #15794

Open tonal opened 3 weeks ago

tonal commented 3 weeks ago

Bug summary

@flow
async def test_flow():
  print('test')

async def tmain():
  flow = test_flow.with_options(
    result_storage=Path('./storage'), # LocalFileSystem(basepath='./storage'), # 
  )
  await flow()

def main():
  asyncio.run(tmain())

if __name__ == '__main__':
  main()

Error output:

Traceback (most recent call last):
  File "/home/tonal/lang/projects/promsoft/tenders/report/flow_rep.py", line 230, in <module>
    main()
  File "/home/tonal/lang/projects/promsoft/tenders/report/flow_rep.py", line 226, in main
    asyncio.run(tmain())
  File "/home/tonal/.pyenv/versions/3.12.7/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/home/tonal/.pyenv/versions/3.12.7/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tonal/.pyenv/versions/3.12.7/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/home/tonal/lang/projects/promsoft/tenders/report/flow_rep.py", line 219, in tmain
    flow = test_flow.with_options(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tonal/lang/projects/promsoft/tenders/report/venv/lib/python3.12/site-packages/prefect/flows.py", line 476, in with_options
    new_flow = Flow(
               ^^^^^
  File "/home/tonal/lang/projects/promsoft/tenders/report/venv/lib/python3.12/site-packages/prefect/flows.py", line 349, in __init__
    raise TypeError(
TypeError: Result storage configuration must be persisted server-side. Please call `.save()` on your block before passing it in.

Version info (prefect version output)

$ prefect version
Version:             3.0.10
API version:         0.8.4
Python version:      3.12.7
Git commit:          3aa2d893
Built:               Tue, Oct 15, 2024 1:31 PM
OS/Arch:             linux/x86_64
Profile:             local
Server type:         server
Pydantic version:    2.9.2
Integrations:
  prefect-ray:       0.4.2
  prefect-aws:       0.5.1

Additional context

If you change the Path to LocalFileSystem, the error is the same.

desertaxle commented 3 weeks ago

Hey @tonal! As shown by the type hinting, result_storage doesn't accept a Path but instead either accepts a WritableFilesystem or a str:

result_storage: Optional[Union[ResultStorage, str]] = None,

and here's the definition of ResultStorage:

ResultStorage = Union[WritableFileSystem, str]

Prefect requires a WriteableFileSystem (which is a type of Block) to be saved server-side so that results can be retrieved for your flow run after the run has been completed. If you update your code to look like this, it should work:

result_storage = LocalFileSystem(basepath='./storage')
result_storage.save("my-result-storage")
@flow
async def test_flow():
  print('test')

async def tmain():
  flow = test_flow.with_options(
    result_storage=result_storage
  )
  await flow()

def main():
  asyncio.run(tmain())

if __name__ == '__main__':
  main()

All in all, this is expected behavior, and the error raised is designed to help guide users towards the expected usage. Do you have any suggestions for improving this experience?