dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

Workflow breaks due to non-deterministic pickling #879

Open jrbourbeau opened 4 months ago

jrbourbeau commented 4 months ago

While writing out a DataFrame to parquet from a Prefect task (not sure if the Prefect part is actually important or not), I got the following error:

Traceback (most recent call last):
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1186, in normalize_callable
    return function_cache[func]
           ~~~~~~~~~~~~~~^^^^^^
KeyError: <function repartition_table.<locals>.name at 0x15ed85ee0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 840, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/projects/coiled/etl-tpch-backup/pipeline/resize.py", line 65, in resize_parquet
    files = repartition_table(files_, table=table)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/tasks.py", line 549, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1155, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1323, in get_task_call_return_value
    return await future._result()
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/engine.py", line 1744, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/projects/coiled/etl-tpch-backup/pipeline/resize.py", line 41, in repartition_table
    df.to_parquet(outdir, compression="snappy", name_function=name)
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_collection.py", line 2887, in to_parquet
    return to_parquet(self, path, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/io/parquet.py", line 380, in to_parquet
    ToParquet(
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_core.py", line 43, in __new__
    _name = inst._name
            ^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/functools.py", line 1001, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_core.py", line 415, in _name
    funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask_expr/_util.py", line 100, in _tokenize_deterministic
    return tokenize(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1023, in tokenize
    token = _normalize_seq_func(args)
            ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1117, in _normalize_seq_func
    item = normalize_token(item)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/utils.py", line 773, in __call__
    return meth(arg, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1165, in normalize_object
    return normalize_callable(o)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1188, in normalize_callable
    result = _normalize_callable(func)
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/james/mambaforge/envs/etl-tpch2/lib/python3.11/site-packages/dask/base.py", line 1229, in _normalize_callable
    raise RuntimeError(
RuntimeError: Function <function repartition_table.<locals>.name at 0x15ed85ee0> may not be deterministically hashed by cloudpickle. See: https://github.com/cloudpipe/cloudpickle/issues/385 for more information.

A few of things come to mind:

  1. Should this be the default behavior? I'm not sure how common it is for this RuntimeError to be raised
  2. Can I disable this and say I'm okay with non-deterministic tokenization?
  3. This error message isn't very approachable to naive users, it'd be nice to reword with that in mind. In particular, "What am I supposed to do about this?" is probably a question users will have

This might actually be a dask/dask issue, but opening up here as I'm currently using dask-expr

cc @phofl @crusaderky

jrbourbeau commented 4 months ago

Can I disable this and say I'm okay with non-deterministic tokenization?

Hopefully with tokenize.ensure-deterministic=False

jrbourbeau commented 4 months ago

Hopefully with tokenize.ensure-deterministic=False

This didn't seem to do the trick

fjetter commented 4 months ago

I believe this should go away with latest main. tokenization is a weird thing and a lot of work went into this recently. Most notably https://github.com/dask/dask/pull/10883 will change this to work I suspect.

The config option is useless since dask-expr hard codes because we rely on this to be actually deterministic

jrbourbeau commented 4 months ago

Ah, thanks. I'll try again with the main branch of dask/dask.

The config option is useless since dask-expr hard codes because we rely on this to be actually deterministic

Just to clarify, things should still work when using the main dask version even though dask-expr is hardcoded the config option?

phofl commented 4 months ago

Just to clarify, things should still work when using the main dask version even though dask-expr is hardcoded the config option?

dask/dask will respect the config option, dask-expr does not. Was this your question?

crusaderky commented 4 months ago

We just removed a hack in dask-expr (#822); things will break if you don't upgrade to the latest dask/dask (dask/dask#10883) too

fjetter commented 4 months ago

things will break if you don't upgrade to the latest dask/da

We should start being mindful about compatibility and pinning here if we want to keep this as a separate repo/package

phofl commented 4 months ago

I’ve already started pinning the releases to a specific dask/dask version