PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.99k stars 1.57k forks source link

Error raise when using xarray to_zarr on a dash array in Prefect. Raise No module named 'prefect.isnan' #15567

Open ghislainp opened 4 days ago

ghislainp commented 4 days ago

Bug summary

This is a very strange bug. I'm just calling "x.to_zarr" where x is a dask array from a prefect task. The problem is that the raised Error is: "ModuleNotFoundError: No module named 'prefect.isnan'" when to_zarr tries to serialized something in the dask array. I can't determine from the traceback (attached below) if it is a prefect bugs, or a dask bug or an interaction between both. Note that I'm using to_zarr with success in other pat of the code (in the same environment). This error has been around for a while (since Spring) and I was hestiating to post it here... but I don't see another place to seek advice.

Version info (prefect version output)

Version:             3.0.4
API version:         0.8.4
Python version:      3.11.6
Git commit:          c068d7e2
Built:               Tue, Oct 1, 2024 11:54 AM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.9.2
Server:
  Database:          sqlite
  SQLite version:    3.46.1
Integrations:
  prefect-shell:     0.3.0
  prefect-gitlab:    0.3.0

Additional context

Traceback (most recent call last):
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 65, in dumps
    pickler.dump(x)
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/__init__.py", line 108, in __getattr__
    return importlib.import_module(f".{attr_name}", package=__name__)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'prefect.isnan'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 366, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 78, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 77, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1479, in dumps
    cp.dump(obj)
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
           ^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/__init__.py", line 108, in __getattr__
    return importlib.import_module(f".{attr_name}", package=__name__)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'prefect.isnan'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/flow_engine.py", line 652, in run_context
    yield self
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/flow_engine.py", line 696, in run_flow_sync
    engine.call_flow_fn()
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/flow_engine.py", line 675, in call_flow_fn
    result = call_with_parameters(self.flow.fn, self.parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmprwamstbsprefect/prefect-main/tasks/melt.py", line 569, in compute_melt_torinesi_flow
    zarrfilename = convert_to_zarr_melt_append_var(sensor, location, resolution=12500)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/tasks.py", line 997, in __call__
    return run_task(
           ^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 1512, in run_task
    return run_task_sync(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 1325, in run_task_sync
    return engine.state if return_type == "state" else engine.result()
                                                       ^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 457, in result
    raise self._raised
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 763, in run_context
    yield self
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 1323, in run_task_sync
    engine.call_task_fn(txn)
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/task_engine.py", line 786, in call_task_fn
    result = call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/utilities/callables.py", line 206, in call_with_parameters
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/tmp/tmprwamstbsprefect/prefect-main/tasks/melt.py", line 539, in convert_to_zarr_melt_append_var
    ds.to_zarr(zarrfilename, mode="a")
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/xarray/core/dataset.py", line 2562, in to_zarr
    return to_zarr(  # type: ignore[call-overload,misc]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/xarray/backends/api.py", line 1785, in to_zarr
    writes = writer.sync(
             ^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/xarray/backends/common.py", line 268, in sync
    delayed_store = chunkmanager.store(
                    ^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py", line 249, in store
    return store(
           ^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/dask/array/core.py", line 1232, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/dask/base.py", line 397, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/client.py", line 3462, in get
    futures = self._graph_to_futures(
              ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/client.py", line 3355, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 392, in serialize
    raise TypeError(msg, str_x) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x7f6903834c90>\n 0. 140089007232000\n>')
zzstoatzz commented 4 days ago

hi @ghislainp - thanks for the issue!

are you able to share your code that lead you to this stack trace?

in particular, this part catches my eye - it could be a problem with our dynamic importing:

File "/home/debian/prefect/.pixi/envs/default/lib/python3.11/site-packages/prefect/init.py", line 108, in getattr
return importlib.import_module(f".{attr_name}", package=name)

but it would be really useful to see some version of your code, since an example like this appears to work

from dask.array.random import random

from prefect import flow, task

@task
def create_dask_array():
    x = random((1000, 1000), chunks=(100, 100))
    return x

@task
def save_to_zarr(x, path: str):
    x.to_zarr(path)
    print(f"Dask array saved to {path}")

@flow
def dask_to_zarr_flow():
    x = create_dask_array()
    save_to_zarr(x, "output.zarr")

if __name__ == "__main__":
    dask_to_zarr_flow()
ghislainp commented 6 hours ago

While cooking a reproducible example, I found that there is no error when the flow is run from the command line as opposed to through a deployment. Surprisingly running from the command line, I got another error in a upstream flow which I didn't had before when running through a deployment. This new error is coming from calling dask LocalCluster within the flow to limit memory usage. I removed this call to LocalCluster and now I don't have any error anymore neither in the upstream flow nor in the flow that was causing the original reported error. I don't understand why I can't create a LocalCluster from within a flow, it is a practice that had been working.

I don't understand neither the interaction between remove the LocalCluster in the upstream snow and the downstream flow because they do not share any variable. The upstream one writes netcdf files that are read by the downstream one, and again I was able to run the downstream flow via command line with suggest (using the exactly same file as when I had an error before).

Could it be an interaction between the prefect flow and dask ?