fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
2.01k stars 94 forks source link

fugue_dask: daemonic processes are not allowed to have children, dask not working in disrtibuted mode #250

Open Matthieu-Tinycoaching opened 3 years ago

Matthieu-Tinycoaching commented 3 years ago

Minimal Code To Reproduce

from fugue import transform
from dask.distributed import Client
client = Client() # without this, dask is not in distributed mode
from fugue_dask import DaskExecutionEngine

# fugue.dask.dataframe.default.partitions determines the default partitions for a new DaskDataFrame
engine = DaskExecutionEngine({"fugue.dask.dataframe.default.partitions":4})
assert engine.conf.get_or_throw("fugue.dask.dataframe.default.partitions",int) == 4

output_df = pd.DataFrame(output_dict)

output_df = transform(output_df,
                filter_rm_duplicates,
                schema="*",
                params=dict(similarity_score_thresh=similarity_score_thresh),
                engine=engine
            )

Describe the bug When launching my fastAPI uvicorn service and making one request I got the following error:

/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43457 instead
  warnings.warn(
/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37649 instead
  warnings.warn(
/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 46423 instead
  warnings.warn(
/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 42079 instead
  warnings.warn(
Task exception was never retrieved
future: <Task finished name='Task-56' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-63' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-60' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-58' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-61' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-59' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-57' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-62' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-62' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-59' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-56' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-63' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-60' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-57' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-61' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
Task exception was never retrieved
future: <Task finished name='Task-58' coro=<_wrap_awaitable() done, defined at /home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py:688> exception=AssertionError('daemonic processes are not allowed to have children')>
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/multiprocessing/process.py", line 118, in start
    assert not _current_process._config.get('daemon'), \
AssertionError: daemonic processes are not allowed to have children
distributed.worker - ERROR - Could not deserialize task
Traceback (most recent call last):
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/worker.py", line 3772, in loads_function
    result = cache_loads[bytes_object]
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/distributed/utils.py", line 1366, in __getitem__
    value = super().__getitem__(key)
  File "/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/collections/__init__.py", line 1010, in __getitem__
    raise KeyError(key)
KeyError: b"\x80\x04\x95\xb5\x14\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94(\x8cL_map-7a75ad39047bb8328d6da024a16823ed-index-0727f9955360b1ccc26f7816e77f4887\x94\x8c\x08builtins\x94\x8c\x07getattr\x94\x93\x94(\x8c\ndask.utils\x94\x8c\x05apply\x94\x93\x94\x8c\x13dask.dataframe.core\x94\x8c\x11apply_and_enforce\x94\x93\x94]\x94\x8c\x13__dask_blockwise__1\x94a\x8c\x08builtins\x94\x8c\x04dict\x94\x93\x94]\x94(]\x94(\x8c\x05_func\x94\x8c\x17cloudpickle.cloudpickle\x94\x8c\r_builtin_type\x94\x93\x94\x8c\nLambdaType\x94\x85\x94R\x94(h\x18\x8c\x08CodeType\x94\x85\x94R\x94(K\x01K\x00K\x00K\x04K\x05K\x13C\x8e|\x00j\x00d\x01\x19\x00d\x01k\x02r\x1ct\x01g\x00\x88\x03\x83\x02\xa0\x02\xa1\x00S\x00t\x03\x88\x06\x83\x01d\x01k\x04r6|\x00j\x04\x88\x06\x88\x05d\x02\x8d\x02}\x00t\x01|\x00j\x05d\x03d\x04\x8d\x01\x88\x00d\x03d\x05\x8d\x03}\x01\x88\x02d\x00k\tr^\x88\x02d\x01|\x01\x83\x02\x01\x00\x88\x04\xa0\x06\x88\x00d\x01\xa1\x02}\x02|\x02\xa0\x07|\x01\xa0\x08\xa1\x00d\x01d\x01\xa1\x03\x01\x00\x88\x01|\x02|\x01\x83\x02}\x03|\x03\xa0\x02\xa1\x00S\x00\x94(NK\x00\x8c\tascending\x94\x85\x94\x88\x8c\x04drop\x94\x85\x94\x8c\x11pandas_df_wrapper\x94\x85\x94t\x94(\x8c\x05shape\x94\x8c\x0fPandasDataFrame\x94\x8c\tas_pandas\x94\x8c\x03len\x94\x8c\x0bsort_values\x94\x8c\x0breset_index\x94\x8c\nget_cursor\x94\x8c\x03set\x94\x8c\npeek_array\x94t\x94(\x8c\x03pdf\x94\x8c\x08input_df\x94\x8c\x06cursor\x94\x8c\toutput_df\x94t\x94\x8cl/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/fugue_dask/execution_engine.py\x94\x8c\x04_map\x94K\xbeC\x1e\x00\x01\x0e\x01\x0e\x01\x0c\x01\x0e\x01\x02\x01\n\x00\x02\x00\x02\xff\x06\x03\x08\x01\n\x01\x0c\x01\x12\x01\n\x01\x94(\x8c\x0cinput_schema\x94\x8c\x08map_func\x94\x8c\x0con_init_once\x94\x8c\routput_schema\x94\x8c\x0epartition_spec\x94\x8c\x0bpresort_asc\x94\x8c\x0cpresort_keys\x94t\x94)t\x94R\x94}\x94(\x8c\x0b__package__\x94\x8c\nfugue_dask\x94\x8c\x08__name__\x94\x8c\x1bfugue_dask.execution_engine\x94\x8c\x08__file__\x94\x8cl/home/matthieu/anaconda3/envs/fastapi-fugue-fresh/lib/python3.8/site-packages/fugue_dask/execution_engine.py\x94uNN(h\x16\x8c\x10_make_empty_cell\x94\x93\x94)R\x94hK)R\x94hK)R\x94hK)R\x94hK)R\x94hK)R\x94hK)R\x94t\x94t\x94R\x94\x8c\x1ccloudpickle.cloudpickle_fast\x94\x8c\x12_function_setstate\x94\x93\x94hU}\x94}\x94(hFh7\x8c\x0c__qualname__\x94\x8c%DaskExecutionEngine.map.<locals>._map\x94\x8c\x0f__annotations__\x94}\x94(h1\x8c\x06typing\x94\x8c\x03Any\x94\x93\x94\x8c\x06return\x94h\x0b\x8c\tDataFrame\x94\x93\x94u\x8c\x0e__kwdefaults__\x94N\x8c\x0c__defaults__\x94N\x8c\n__module__\x94hG\x8c\x07__doc__\x94N\x8c\x0b__closure__\x94(h\x16\x8c\n_make_cell\x94\x93\x94\x8c\x18triad.collections.schema\x94\x8c\x06Schema\x94\x93\x94)R\x94(\x8c\nacquisCode\x94\x8c\x0bpyarrow.lib\x94\x8c\x05field\x94\x93\x94(\x8c\nacquisCode\x94hq\x8c\x0etype_for_alias\x94\x93\x94\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x04code\x94hs(\x8c\x04code\x94hv\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x05level\x94hs(\x8c\x05level\x94hv\x8c\x05int64\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x05title\x94hs(\x8c\x05title\x94hv\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x10similarity_score\x94hs(\x8c\x10similarity_score\x94hv\x8c\x06double\x94\x85\x94R\x94\x88Nt\x94R\x94u}\x94(\x8c\t_readonly\x94\x88\x8c\r_need_reindex\x94\x88\x8c\n_key_index\x94}\x94\x8c\n_index_key\x94]\x94ub\x85\x94R\x94hkh\x18\x8c\nMethodType\x94\x85\x94R\x94\x8c%fugue.extensions._builtins.processors\x94\x8c\x16_TransformerRunner.run\x94\x93\x94h\xa4\x8c\x12_TransformerRunner\x94\x93\x94)\x81\x94}\x94(\x8c\x06schema\x94ho\x8c\x08metadata\x94\x8c\x16triad.collections.dict\x94\x8c\tParamDict\x94\x93\x94)R\x94}\x94(h\x99\x88h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x8c\x0btransformer\x94\x8c$fugue.extensions.transformer.convert\x94\x8c\x12_FuncAsTransformer\x94\x93\x94)\x81\x94}\x94(\x8c\x08_wrapper\x94\x8c\x1afugue._utils.interfaceless\x94\x8c\x0fFunctionWrapper\x94\x93\x94)\x81\x94}\x94(\x8c\r_class_method\x94\x89\x8c\x07_params\x94h\xad\x8c\x12IndexedOrderedDict\x94\x93\x94)R\x94(h4h\xbb\x8c\x0c_PandasParam\x94\x93\x94)\x81\x94}\x94(\x8c\x08required\x94\x88\x8c\x07default\x94\x8c\x07inspect\x94\x8c\x06_empty\x94\x93\x94\x8c\x04code\x94\x8c\x01p\x94\x8c\nannotation\x94\x8c\x0cpd.DataFrame\x94ub\x8c\x17similarity_score_thresh\x94h\xbb\x8c\x0b_OtherParam\x94\x93\x94)\x81\x94}\x94(h\xc9\x88h\xcah\xcdh\xce\x8c\x01x\x94h\xd0\x8c\x07[Other]\x94ubu}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x8c\x03_rt\x94h\xc6)\x81\x94}\x94(h\xc9\x88h\xcaNh\xceh\xcfh\xd0h\xd1ubh\x15\x8c\x16sts_tf_semantic_search\x94\x8c\x14filter_rm_duplicates\x94\x93\x94ub\x8c\x12_output_schema_arg\x94\x8c\x01*\x94\x8c\x11_validation_rules\x94}\x94\x8c\x0e_uses_callback\x94\x89\x8c\x12_requires_callback\x94\x89\x8c\x0f_partition_spec\x94\x8c\x1bfugue.collections.partition\x94\x8c\rPartitionSpec\x94\x93\x94)\x81\x94}\x94(\x8c\x0f_num_partitions\x94\x8c\x010\x94\x8c\x05_algo\x94\x8c\x00\x94\x8c\r_partition_by\x94]\x94\x8c\x08_presort\x94h\xc3)R\x94}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x8c\x0b_size_limit\x94K\x00\x8c\n_row_limit\x94K\x00ub\x8c\x0f_has_rpc_client\x94\x89\x8c\x0e_workflow_conf\x94h\xaf)R\x94(\x8c\x1afugue.workflow.concurrency\x94K\x01\x8c\x1bfugue.workflow.auto_persist\x94\x89\x8c'fugue.dask.dataframe.default.partitions\x94K\x04u}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ubh\xc1h\xaf)R\x94h\xd2G?\xe0\x00\x00\x00\x00\x00\x00s}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x8c\x0b_key_schema\x94hn)R\x94}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x8c\x0e_output_schema\x94hn)R\x94(\x8c\nacquisCode\x94hs(\x8c\nacquisCode\x94hv\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x04code\x94hs(\x8c\x04code\x94hv\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x05level\x94hs(\x8c\x05level\x94hv\x8c\x05int64\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x05title\x94hs(\x8c\x05title\x94hv\x8c\x06string\x94\x85\x94R\x94\x88Nt\x94R\x94\x8c\x10similarity_score\x94hs(\x8c\x10similarity_score\x94hv\x8c\x06double\x94\x85\x94R\x94\x88Nt\x94R\x94u}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ubub\x8c\rignore_errors\x94)ub\x86\x94R\x94\x85\x94R\x94hk\x8c\x15triad.utils.threading\x94\x8c\x07RunOnce\x94\x93\x94)\x81\x94}\x94(h\x15h\xa3h\xa4\x8c\x1a_TransformerRunner.on_init\x94\x93\x94h\xa9\x86\x94R\x94\x8c\t_key_func\x94h\x1b(h\x1e(K\x00K\x00K\x00K\x02K\x05K\x1fC\x16t\x00t\x01\x88\x00\x83\x01t\x01|\x00d\x01\x19\x00\x83\x01\x83\x02S\x00\x94NK\x00\x86\x94\x8c\x07to_uuid\x94\x8c\x02id\x94\x86\x94\x8c\x04args\x94\x8c\x06kwargs\x94\x86\x94h6\x8c\x08<lambda>\x94K\xbaC\x00\x94\x8c\x07on_init\x94\x85\x94)t\x94R\x94hCNNhK)R\x94\x85\x94t\x94R\x94hXjU\x01\x00\x00}\x94}\x94(hFjL\x01\x00\x00h[\x8c)DaskExecutionEngine.map.<locals>.<lambda>\x94h]}\x94heNhfNhghGhhNhihkjB\x01\x00\x00\x85\x94R\x94\x85\x94\x8c\x17_cloudpickle_submodules\x94]\x94\x8c\x0b__globals__\x94}\x94jF\x01\x00\x00\x8c\x10triad.utils.hash\x94jF\x01\x00\x00\x93\x94su\x86\x94\x86R0\x8c\n_lock_type\x94\x8c\tthreading\x94\x8c\x05RLock\x94\x93\x94ub\x85\x94R\x94hkhn)R\x94(j\x0f\x01\x00\x00j\x15\x01\x00\x00j\x16\x01\x00\x00j\x1c\x01\x00\x00j\x1d\x01\x00\x00j#\x01\x00\x00j$\x01\x00\x00j*\x01\x00\x00j+\x01\x00\x00j1\x01\x00\x00u}\x94(h\x99\x89h\x9a\x88h\x9b}\x94h\x9d]\x94ub\x85\x94R\x94hkh\xec\x85\x94R\x94hk]\x94\x85\x94R\x94hk]\x94\x85\x94R\x94t\x94j]\x01\x00\x00]\x94j_\x01\x00\x00}\x94h(\x8c fugue.dataframe.pandas_dataframe\x94h(\x93\x94su\x86\x94\x86R0e]\x94(\x8c\x05_meta\x94\x8c\x11pandas.core.frame\x94hc\x93\x94)\x81\x94}\x94(\x8c\x04_mgr\x94\x8c\x1epandas.core.internals.managers\x94\x8c\x0cBlockManager\x94\x93\x94\x8c\tfunctools\x94\x8c\x07partial\x94\x93\x94\x8c\x1cpandas.core.internals.blocks\x94\x8c\tnew_block\x94\x93\x94\x85\x94R\x94(j\x8d\x01\x00\x00)}\x94\x8c\x04ndim\x94K\x02sNt\x94b\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01b\x94\x87\x94R\x94(K\x01K\x01K\x00\x86\x94j\x96\x01\x00\x00\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x89jM\x01\x00\x00t\x94bh\x10\x8c\x05slice\x94\x93\x94K\x02K\x03K\x01\x87\x94R\x94\x86\x94R\x94j\x8a\x01\x00\x00j\x8d\x01\x00\x00\x85\x94R\x94(j\x8d\x01\x00\x00)}\x94j\x91\x01\x00\x00K\x02sNt\x94bj\x95\x01\x00\x00j\x98\x01\x00\x00K\x00\x85\x94j\x9a\x01\x00\x00\x87\x94R\x94(K\x01K\x01K\x00\x86\x94j\x9f\x01\x00\x00\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03j\xa3\x01\x00\x00NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x89jM\x01\x00\x00t\x94bj\xa7\x01\x00\x00K\x04K\x05K\x01\x87\x94R\x94\x86\x94R\x94j\x8a\x01\x00\x00j\x8d\x01\x00\x00\x85\x94R\x94(j\x8d\x01\x00\x00)}\x94j\x91\x01\x00\x00K\x02sNt\x94bj\x95\x01\x00\x00j\x98\x01\x00\x00K\x00\x85\x94j\x9a\x01\x00\x00\x87\x94R\x94(K\x01K\x03K\x00\x86\x94j\x9f\x01\x00\x00\x8c\x02O8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?t\x94b\x89]\x94t\x94bj\x95\x01\x00\x00j\x98\x01\x00\x00K\x00\x85\x94j\x9a\x01\x00\x00\x87\x94R\x94(K\x01K\x03\x85\x94j\xa2\x01\x00\x00\x89C\x18\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x94t\x94b\x86\x94R\x94\x87\x94]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_Index\x94\x93\x94j\xd6\x01\x00\x00\x8c\x05Index\x94\x93\x94}\x94(\x8c\x04data\x94j\x95\x01\x00\x00j\x98\x01\x00\x00K\x00\x85\x94j\x9a\x01\x00\x00\x87\x94R\x94(K\x01K\x05\x85\x94j\xc7\x01\x00\x00\x89]\x94(\x8c\nacquisCode\x94\x8c\x04code\x94\x8c\x05level\x94\x8c\x05title\x94\x8c\x10similarity_score\x94et\x94b\x8c\x04name\x94Nu\x86\x94R\x94j\xd8\x01\x00\x00\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(j\xe8\x01\x00\x00N\x8c\x05start\x94K\x00\x8c\x04stop\x94K\x00\x8c\x04step\x94K\x01u\x86\x94R\x94e\x86\x94R\x94\x8c\x04_typ\x94\x8c\tdataframe\x94\x8c\t_metadata\x94]\x94\x8c\x05attrs\x94}\x94\x8c\x06_flags\x94}\x94\x8c\x17allows_duplicate_labels\x94\x88subee\x86\x94t\x94\x8c\x13__dask_blockwise__0\x94\x87\x94\x8c\x13__dask_blockwise__0\x94\x8c\x05index\x94\x8c\x13__dask_blockwise__2\x94\x8c,from_pandas-aacdbbf0ef98323b8ec2e8b978aec9b9\x94uh\x04\x8c\x13__dask_blockwise__1\x94\x85\x94\x8c6subgraph_callable-d7c374d3-ec5e-4f4e-b4c0-1114d2c15acf\x94t\x94R\x94.

Expected behavior Without having these two lines of code at the beginning:

from dask.distributed import Client
client = Client() # without this, dask is not in distributed mode

The code works without error but I want to use mutliprocessing to speedup the execution time.

Environment (please complete the following information):

goodwanghan commented 3 years ago

I guess you want to have another level of distribution inside filter_rm_duplicates, right?

I think this is disallowed by dask see similar

https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic

Matthieu-Tinycoaching commented 3 years ago

@goodwanghan I want to use multiprocess ors with Dask to filter my pandas DataFrames with this specific function in order to accelerate my code compared to single core pandas code.

Isn't it possible with fugue_dask?

goodwanghan commented 3 years ago

@Matthieu-Tinycoaching do you mind to share the filter_rm_duplicates? This should be a very common case and of course Fugue supports. Also this error means dask doesn't allow you to do so not Fugue. But can you let me take a look at the code?

goodwanghan commented 3 years ago

@Matthieu-Tinycoaching if you don't have further question, do you mind I close this issue?