dask / dask

Parallel computing with task scheduling
https://dask.org
BSD 3-Clause "New" or "Revised" License
12.45k stars 1.7k forks source link

Hash join transfer with error cannot pickle '_contextvars.ContextVar' object #11019

Open guozhans opened 6 months ago

guozhans commented 6 months ago

Describe the issue: Hi I encountered this error, and don't know what happened under the hood. Therefore, I open it for better tracking.

I have some spatial datasets in parquet format with row group size 64MB that contains nodes and coordinates. I never intend to do any re-partition or shuffle operations, but it could happen during merge, or set index operation, etc.. The hash join issue was found with the dataset bigger more than one row group size enough to create few partitions in dataframe on hash join operations. The error always showed with hash-join-transfer-xxxxxxxxx operation.

To workaround this issue, i changed shuffle method back to "tasks".

Error messages:

Traceback (most recent call last):
  File "/opt/project/src/main/python/utils/shuffle_test.py", line 52, in <module>
    main()
  File "/opt/project/src/main/python/utils/shuffle_test.py", line 48, in main
    print(nodes["longitude"].compute())
  File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 375, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/dask/base.py", line 661, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/loky/process_executor.py", line 370, in _sendback_result
    result_queue.put(
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/queues.py", line 230, in put
    obj = dumps(obj, reducers=self._reducers)
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py", line 215, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "/usr/local/lib/python3.10/dist-packages/loky/backend/reduction.py", line 208, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "/usr/local/lib/python3.10/dist-packages/cloudpickle/cloudpickle.py", line 1245, in dump
    return super().dump(obj)
TypeError: cannot pickle '_contextvars.ContextVar' object

Minimal Complete Verifiable Example:

import faulthandler
import logging
from concurrent.futures import ThreadPoolExecutor

import dask
import dask.dataframe as dd

from distributed import WorkerPlugin, Worker, LocalCluster, Client, progress, wait
from loky import ProcessPoolExecutor

class TaskExecutorPool(WorkerPlugin):
    def __init__(self, logger, name):
        self.logger = logger
        self.worker = None
        self.name = name

    def setup(self, worker: Worker):
        faulthandler.enable()
        executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors[self.name] = executor
        self.worker = worker

    def transition(self, key, start, finish, *args, **kwargs):
        if finish == 'error':
            ts = self.worker.tasks[key]
            exc_info = (type(ts.exception), ts.exception, ts.traceback)
            print(f"Task traceback: {ts.traceback}")
            print(f"Task exception: {exc_info}")
            self.logger.error(f"Error during computation of {key}, caused by {str(ts.exception)}.")

def main():
    cluster = LocalCluster(processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        client.register_plugin(TaskExecutorPool(logging, "process"), name="process")
        with dask.annotate(executor="process", retries=10):
            nodes = dd.read_parquet("node parquet file with multiple row group size", columns=["id", "tags"])
            node_coordinates = dd.read_parquet("node parquet file with multiple row group size"
                                               , columns=["id", "latitude", "longitude"])

            # This re-partition operation can make different partitions to nodes, and later merge operation will have hash join operation inside to make error showed more often
            node_coordinates = node_coordinates.repartition(partition_size="some size")
            nodes = nodes.merge(
                node_coordinates, how="left", left_on=["id"], right_on=["id"], suffixes=(False, False), shuffle_method="p2p")
            client.persist(nodes)
            progress(nodes)
            print(nodes["latitude"].compute())

if __name__ == "__main__":
    main()

Anything else we need to know?:

Environment:

phofl commented 6 months ago

Can you post a reproducible example? E.g. have some code that creates the parquet files that you read. I have trouble reproducing the error that you are seeing

guozhans commented 6 months ago

Hi @phofl

I attached small dataset and my pip installation at bottom, you can try this dataset with my another script. the dataset is quite small, it suppose won't cause you OOM. At least it doesn't cause OOM here. :)

Also you must enable worker plugin with loky

If you use nanny, the script will run successfully.

dji-osm.tar.gz

import logging
import dask
import dask.dataframe as dd
import pandas as pd
import dask.config as dc

from dask.delayed import delayed
from distributed import WorkerPlugin, Worker, LocalCluster, Client
from loky import ProcessPoolExecutor

class TaskExecutorPool(WorkerPlugin):
    def __init__(self, logger, name):
        self.logger = logger
        self.worker = None
        self.name = name

    def setup(self, worker: Worker):
        executor = ProcessPoolExecutor(max_workers=worker.state.nthreads)
        worker.executors[self.name] = executor
        self.worker = worker

    def transition(self, key, start, finish, *args, **kwargs):
        if finish == 'error':
            ts = self.worker.tasks[key]
            exc_info = (type(ts.exception), ts.exception, ts.traceback)
            print(f"Task traceback: {ts.traceback}")
            print(f"Task exception: {exc_info}")
            self.logger.error(f"Error during computation of {key}, caused by {str(ts.exception)}.")

def main():
    cluster = LocalCluster(n_workers=4, processes=False, silence_logs=logging.DEBUG)
    with Client(cluster) as client:
        client.register_plugin(TaskExecutorPool(logging, "process"), name="process")
        with dask.annotate(executor="process", retries=10):
            dc.set({"dataframe.convert-string": False})
            ways = dd.read_parquet(
                "djibouti-latest.osm/way", columns=["id", "nodes"], blocksize="8MiB")
            node_coordinates = dd.read_parquet(
                "djibouti-latest.osm/node", columns=["latitude", "longitude"], index=["id"])

            way_dfs = ways.to_delayed()
            delays = []
            for way_df in way_dfs:
                delays.append(delayed(create_df)(way_df))
            dfs = dd.compute(*delays)
            df = dd.concat([*dfs])
            df = dd.merge(df, node_coordinates, left_on=["nodeId"], right_on=["id"], right_index=True).set_index("id", shuffle_method="p2p")
            print(f"df = {df.compute()}")

def create_df(way_df):
    new_df = way_df.set_index("id").nodes.apply(
        lambda ns: pd.Series([n["nodeId"] for n in ns], dtype="int64")
    ).convert_dtypes(convert_integer=True).stack().reset_index(0, name="nodeId")
    return dd.from_pandas(new_df, npartitions=10)

if __name__ == "__main__":
    main()

My pip install pip install dask-kubernetes==2024.3.1 dask[complete]=="2024.2.1" dask-geopandas pandas==2.1.4 pandas[performance]==2.1.4 numpy==1.22.4 jupyter-server-proxy pyarrow==15.0.2 shapely==2.0.3 pyproj==3.6.1 geopandas==0.14.3 geoparquet==0.0.3 wheel loky==3.4.1 graphviz

phofl commented 5 months ago

Thanks! Is it possible to reproduce this without the tar file?

guozhans commented 5 months ago

I don't understand, and can't you use this data? or you are check something? Or do you mean Dask can only work on specific dataset? Perhaps you could give more context.

I didn't try to reproduce it with other data, but i observe similar data set has same issue.

phofl commented 5 months ago

It's always better to have something that can just be copy-paste for developers. Here is some context about why downloading those files can be a little concerning

https://github.com/dask/dask/issues/10995#issuecomment-2014736296

guozhans commented 5 months ago

Hi @phofl I see, and thanks for providing some context.

If i copy and paste, somehow it will change data format. the attached file only has few hundreds lines. I hope it will work for you..

lie-de-clipperton.zip

Or you can download complete Île de Clipperton PBF data from https://download.geofabrik.de/australia-oceania.html and transform it into parquet files by using osm-parquetizer, and then to separate ways nodes

Sam