coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

"duplicate key value violates unique constraint" when using software environment #92

Closed rubenvdg closed 3 years ago

rubenvdg commented 3 years ago

When creating a software environment:

coiled.create_software_environment(
    name="some_env",
    pip=["dask==2020.12.0", "pandas==1.2.0"],
)

and making a cluster:

cluster = coiled.Cluster(name="some_cluster", n_workers=2, software="some_env")

I catch:

------------------------------------------------------------
ValueError                 Traceback (most recent call last)
<ipython-input-6-f6172a40365e> in <module>
----> 1 cluster = coiled.Cluster(name="some_cluster", n_workers=2, software="some_env")

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/cluster.py in __init__(self, n_workers, configuration, software, worker_cpu, worker_gpu, worker_memory, worker_class, worker_options, scheduler_cpu, scheduler_memory, scheduler_class, scheduler_options, name, asynchronous, cloud, account, shutdown_on_close, backend_options, credentials)
    151         self._name = "coiled.Cluster"  # Used in Dask's Cluster._ipython_display_
    152         if not self.asynchronous:
--> 153             self.sync(self._start)
    154 
    155     @property

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/deploy/cluster.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    181             return future
    182         else:
--> 183             return sync(self.loop, func, *args, **kwargs)
    184 
    185     def _log(self, log):

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/cluster.py in _start(self)
    192                 raise ValueError(error_msg)
    193 
--> 194             self.cluster_id = await self.cloud.create_cluster(
    195                 account=self.account,
    196                 configuration=self.configuration,  # type: ignore

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/coiled/core.py in _create_cluster(self, name, configuration, software, worker_cpu, worker_gpu, worker_memory, worker_class, worker_options, scheduler_cpu, scheduler_memory, scheduler_class, scheduler_options, account, workers, log_output, backend_options)
    361         error_details = await self._websocket_stream(ws, log_output, use_spinner=False)
    362         if error_details:
--> 363             raise ValueError(f"Unable to create cluster: {error_details}")
    364 
    365         return await self._get_cluster_by_name(name=name, account=account)

ValueError: Unable to create cluster: duplicate key value violates unique constraint "cloud_softwareenvironment_account_id_name_******_uniq"
DETAIL:  Key (account_id, name)=(898, ) already exists.

I replaced the id with ***** in the above stack trace.

I have no existing clusters and no existing software environments. Creating the environment runs without problems.

I'm running the following specs:

Python 3.8.6
MacOS Big Sur 11.1
coiled 0.0.32
christianacromer commented 3 years ago

Someone from engineering will get back to you shortly on this as well! Cheers, Ruben!

dantheman39 commented 3 years ago

Thanks for reporting, @rubenvdg . Checking it out now.

dantheman39 commented 3 years ago

@rubenvdg we've deployed a fix to a bug that I think was the cause of your issue. Would you mind trying again?

But FYI, when creating a software environment with pip, you're going to want to use "dask[complete]" (or dask[distributed]) so that "distributed" is also installed. We should probably be better at pointing this out in a few more places in our docs:

coiled.create_software_environment(
    name="some_env",
    pip=["dask[complete]==2020.12.0", "pandas==1.2.0"],
)
rubenvdg commented 3 years ago

@dantheman39 thanks for your swift response.

Now, if I run:

import coiled
from dask.distributed import Client

coiled.create_software_environment(
    name="some_env",
    pip=["dask[complete]==2020.12.0", "pandas==1.2.0"],
)

cluster = coiled.Cluster(name="some_cluster", software="some_env")
client = Client(cluster)

it doesn't crash but I receive a version mismatch:

/Users/rvdgeer/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py:1128: VersionMismatchWarning: Mismatched versions found

+---------+---------------+---------------+---------------+
| Package  | client                | scheduler         | workers       |
+---------+---------------+---------------+---------------+
| python    | 3.8.6.final.0      | 3.7.6.final.0       | 3.7.6.final.0 |
+---------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

which is strange since the dashboard says I'm running coiled/default-py38.

Also, if I ignore the warning and read a CSV:

taxi_data = dd.read_csv(
    "s3://dask-data/nyc-taxi/2015/yellow_tripdata_2015-*.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={'anon': True}
)

print(f"row count: {taxi_data.shape[0].compute()}")

I catch:

KilledWorker: ("('read-csv-900091d03e61599b9107c41c87a4498b', 112)", <Worker 'tls://10.2.13.122:38111', name: ruben-van-de-geer9-3475-worker-4-38d293, memory: 0, processing: 365>)
dantheman39 commented 3 years ago

About the version mismatch, that's not great, I'll ask my colleagues for some hints on what may be going on there.

FYI to see why the worker was killed, you can view the logs from the clusters table, click the three lines on the far right:

Screen Shot 2020-12-29 at 9 46 40 AM

Looking at the logs, the worker was killed due to an unsupported pickle protocol, and seems to be a result of the version mismatch you're describing (python 3.8 being on your local and 3.7 being on the scheduler). Pasting them here for posterity:

distributed.nanny - INFO -         Start Nanny at: 'tls://10.2.13.122:37995'
distributed.worker - INFO -       Start worker at:    tls://10.2.13.122:38111
distributed.worker - INFO -          Listening to:    tls://10.2.13.122:38111
distributed.worker - INFO -          dashboard at:          10.2.13.122:43083
distributed.worker - INFO - Waiting to connect to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /dask-worker-space/dask-worker-space/worker-rc1du0jf
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95\x96\x05\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-900091d03e61599b9107c41c87a4498b\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c3dask-data/nyc-taxi/2015/yellow_tripdata_2015-04.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94J\x00@B\x0fJ\x00\x90\xd0\x03C\x01\n\x94t\x94B\x17\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount\r\n\x94}\x94\x8c\x0bparse_dates\x94]\x94(\x8c\x14tpep_pickup_datetime\x94\x8c\x15tpep_dropoff_datetime\x94es}\x94(\x8c\x08VendorID\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x14tpep_pickup_datetime\x94h9\x8c\x02M8\x94\x89\x88\x87\x94R\x94(K\x04h=NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00}\x94(C\x02ns\x94K\x01K\x01K\x01t\x94\x86\x94t\x94b\x8c\x15tpep_dropoff_datetime\x94hB\x8c\x0fpassenger_count\x94h<\x8c\rtrip_distance\x94h9\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03h=NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x10pickup_longitude\x94hM\x8c\x0fpickup_latitude\x94hM\x8c\nRateCodeID\x94h<\x8c\x12store_and_fwd_flag\x94h9\x8c\x02O8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?t\x94b\x8c\x11dropoff_longitude\x94hM\x8c\x10dropoff_latitude\x94hM\x8c\x0cpayment_type\x94h<\x8c\x0bfare_amount\x94hM\x8c\x05extra\x94hM\x8c\x07mta_tax\x94hM\x8c\ntip_amount\x94hM\x8c\x0ctolls_amount\x94hM\x8c\x15improvement_surcharge\x94hM\x8c\x0ctotal_amount\x94hMu]\x94(h6h?hHhIhJhOhPhQhRhXhYhZh[h\\h]h^h_h`hae\x88\x89Ne\x86\x94.'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.core - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f7afefb2490>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - -------------------------------------------------
distributed.worker - WARNING - Mismatched versions found
+---------+---------------+---------------+------------------------------------+
| Package | This Worker   | scheduler     | workers                            |
+---------+---------------+---------------+------------------------------------+
| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |
+---------+---------------+---------------+------------------------------------+
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Stopping worker at tls://10.2.13.122:38111
distributed.worker - INFO -       Start worker at:    tls://10.2.13.122:41153
distributed.worker - INFO -          Listening to:    tls://10.2.13.122:41153
distributed.worker - INFO -          dashboard at:          10.2.13.122:39881
distributed.worker - INFO - Waiting to connect to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          4
distributed.worker - INFO -                Memory:                   17.18 GB
distributed.worker - INFO -       Local Directory: /dask-worker-space/dask-worker-space/worker-zk52l3kf
distributed.worker - INFO - -------------------------------------------------
distributed.worker - WARNING - Mismatched versions found
+---------+---------------+---------------+------------------------------------+
| Package | This Worker   | scheduler     | workers                            |
+---------+---------------+---------------+------------------------------------+
| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |
+---------+---------------+---------------+------------------------------------+
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95{\x06\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-5ee8045a6925c7bf15db278d147b4b54\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c-nyc-tlc/trip data/yellow_tripdata_2019-01.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94J\x00\x00\x00\x0bJ\x00\x00\x00\x01C\x01\n\x94t\x94B\x02\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n\x94}\x94(\x8c\x0bparse_dates\x94]\x94(\x8c\x14tpep_pickup_datetime\x94\x8c\x15tpep_dropoff_datetime\x94e\x8c\x05dtype\x94}\x94(\x8c\x0cpayment_type\x94\x8c\x05UInt8\x94\x8c\x08VendorID\x94h8\x8c\x0fpassenger_count\x94h8\x8c\nRatecodeID\x94h8\x8c\x12store_and_fwd_flag\x94\x8c\x08category\x94\x8c\x0cPULocationID\x94\x8c\x06UInt16\x94\x8c\x0cDOLocationID\x94h?uu}\x94(\x8c\x08VendorID\x94\x8c\x1apandas.core.arrays.integer\x94\x8c\nUInt8Dtype\x94\x93\x94)\x81\x94}\x94\x8c\x06_cache\x94}\x94(\x8c\x0bnumpy_dtype\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02u1\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x04kind\x94\x8c\x01u\x94usb\x8c\x14tpep_pickup_datetime\x94hM\x8c\x02M8\x94\x89\x88\x87\x94R\x94(K\x04\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00}\x94(C\x02ns\x94K\x01K\x01K\x01t\x94\x86\x94t\x94b\x8c\x15tpep_dropoff_datetime\x94hX\x8c\x0fpassenger_count\x94hF\x8c\rtrip_distance\x94hM\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\nRatecodeID\x94hF\x8c\x12store_and_fwd_flag\x94h=\x8c\x0cPULocationID\x94hC\x8c\x0bUInt16Dtype\x94\x93\x94)\x81\x94}\x94hH}\x94(hJhM\x8c\x02u2\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bhShTusb\x8c\x0cDOLocationID\x94hk\x8c\x0cpayment_type\x94hF\x8c\x0bfare_amount\x94hd\x8c\x05extra\x94hd\x8c\x07mta_tax\x94hd\x8c\ntip_amount\x94hd\x8c\x0ctolls_amount\x94hd\x8c\x15improvement_surcharge\x94hd\x8c\x0ctotal_amount\x94hd\x8c\x14congestion_surcharge\x94hdu]\x94(hBhUh_h`hahfhghhhrhshthuhvhwhxhyhzh{e\x88\x89Ne\x86\x94.'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.core - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f99ebaefcd0>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - -------------------------------------------------
distributed.worker - WARNING - Mismatched versions found
+---------+---------------+---------------+------------------------------------+
| Package | This Worker   | scheduler     | workers                            |
+---------+---------------+---------------+------------------------------------+
| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |
+---------+---------------+---------------+------------------------------------+
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95{\x06\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-5ee8045a6925c7bf15db278d147b4b54\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c-nyc-tlc/trip data/yellow_tripdata_2019-01.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94J\x00\x00\x00\nJ\x00\x00\x00\x01C\x01\n\x94t\x94B\x02\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n\x94}\x94(\x8c\x0bparse_dates\x94]\x94(\x8c\x14tpep_pickup_datetime\x94\x8c\x15tpep_dropoff_datetime\x94e\x8c\x05dtype\x94}\x94(\x8c\x0cpayment_type\x94\x8c\x05UInt8\x94\x8c\x08VendorID\x94h8\x8c\x0fpassenger_count\x94h8\x8c\nRatecodeID\x94h8\x8c\x12store_and_fwd_flag\x94\x8c\x08category\x94\x8c\x0cPULocationID\x94\x8c\x06UInt16\x94\x8c\x0cDOLocationID\x94h?uu}\x94(\x8c\x08VendorID\x94\x8c\x1apandas.core.arrays.integer\x94\x8c\nUInt8Dtype\x94\x93\x94)\x81\x94}\x94\x8c\x06_cache\x94}\x94(\x8c\x0bnumpy_dtype\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02u1\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x04kind\x94\x8c\x01u\x94usb\x8c\x14tpep_pickup_datetime\x94hM\x8c\x02M8\x94\x89\x88\x87\x94R\x94(K\x04\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00}\x94(C\x02ns\x94K\x01K\x01K\x01t\x94\x86\x94t\x94b\x8c\x15tpep_dropoff_datetime\x94hX\x8c\x0fpassenger_count\x94hF\x8c\rtrip_distance\x94hM\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\nRatecodeID\x94hF\x8c\x12store_and_fwd_flag\x94h=\x8c\x0cPULocationID\x94hC\x8c\x0bUInt16Dtype\x94\x93\x94)\x81\x94}\x94hH}\x94(hJhM\x8c\x02u2\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bhShTusb\x8c\x0cDOLocationID\x94hk\x8c\x0cpayment_type\x94hF\x8c\x0bfare_amount\x94hd\x8c\x05extra\x94hd\x8c\x07mta_tax\x94hd\x8c\ntip_amount\x94hd\x8c\x0ctolls_amount\x94hd\x8c\x15improvement_surcharge\x94hd\x8c\x0ctotal_amount\x94hd\x8c\x14congestion_surcharge\x94hdu]\x94(hBhUh_h`hahfhghhhrhshthuhvhwhxhyhzh{e\x88\x89Ne\x86\x94.'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.core - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f99ebaefcd0>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - -------------------------------------------------
distributed.worker - WARNING - Mismatched versions found
+---------+---------------+---------------+------------------------------------+
| Package | This Worker   | scheduler     | workers                            |
+---------+---------------+---------------+------------------------------------+
| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |
+---------+---------------+---------------+------------------------------------+
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95{\x06\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-5ee8045a6925c7bf15db278d147b4b54\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c-nyc-tlc/trip data/yellow_tripdata_2019-01.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94J\x00\x00\x00\x06J\x00\x00\x00\x01C\x01\n\x94t\x94B\x02\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n\x94}\x94(\x8c\x0bparse_dates\x94]\x94(\x8c\x14tpep_pickup_datetime\x94\x8c\x15tpep_dropoff_datetime\x94e\x8c\x05dtype\x94}\x94(\x8c\x0cpayment_type\x94\x8c\x05UInt8\x94\x8c\x08VendorID\x94h8\x8c\x0fpassenger_count\x94h8\x8c\nRatecodeID\x94h8\x8c\x12store_and_fwd_flag\x94\x8c\x08category\x94\x8c\x0cPULocationID\x94\x8c\x06UInt16\x94\x8c\x0cDOLocationID\x94h?uu}\x94(\x8c\x08VendorID\x94\x8c\x1apandas.core.arrays.integer\x94\x8c\nUInt8Dtype\x94\x93\x94)\x81\x94}\x94\x8c\x06_cache\x94}\x94(\x8c\x0bnumpy_dtype\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02u1\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x04kind\x94\x8c\x01u\x94usb\x8c\x14tpep_pickup_datetime\x94hM\x8c\x02M8\x94\x89\x88\x87\x94R\x94(K\x04\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00}\x94(C\x02ns\x94K\x01K\x01K\x01t\x94\x86\x94t\x94b\x8c\x15tpep_dropoff_datetime\x94hX\x8c\x0fpassenger_count\x94hF\x8c\rtrip_distance\x94hM\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\nRatecodeID\x94hF\x8c\x12store_and_fwd_flag\x94h=\x8c\x0cPULocationID\x94hC\x8c\x0bUInt16Dtype\x94\x93\x94)\x81\x94}\x94hH}\x94(hJhM\x8c\x02u2\x94\x89\x88\x87\x94R\x94(K\x03hYNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bhShTusb\x8c\x0cDOLocationID\x94hk\x8c\x0cpayment_type\x94hF\x8c\x0bfare_amount\x94hd\x8c\x05extra\x94hd\x8c\x07mta_tax\x94hd\x8c\ntip_amount\x94hd\x8c\x0ctolls_amount\x94hd\x8c\x15improvement_surcharge\x94hd\x8c\x0ctotal_amount\x94hd\x8c\x14congestion_surcharge\x94hdu]\x94(hBhUh_h`hahfhghhhrhshthuhvhwhxhyhzh{e\x88\x89Ne\x86\x94.'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.core - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - ERROR - unsupported pickle protocol: 5
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f99ebaefcd0>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler
    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream
    msgs = await comm.read()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read
    allow_offload=self.allow_offload,
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames
    res = _from_frames()
  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames
    frames, deserialize=deserialize, deserializers=deserializers
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads
    value = _deserialize(head, fs, deserializers=deserializers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize
    return loads(header, frames)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads
    return pickle.loads(x, buffers=buffers)
  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
ValueError: unsupported pickle protocol: 5
distributed.worker - INFO - -------------------------------------------------
distributed.worker - WARNING - Mismatched versions found
+---------+---------------+---------------+------------------------------------+
| Package | This Worker   | scheduler     | workers                            |
+---------+---------------+---------------+------------------------------------+
| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |
+---------+---------------+---------------+------------------------------------+
distributed.worker - INFO -         Registered to: tls://ip-10-2-13-108.us-east-2.compute.internal:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
marcosmoyano commented 3 years ago

I was able to reproduce this just now. While we look at it, a workaround is to define your dependencies using conda rather than pip

coiled.create_software_environment(
    name="some_env",
    conda={"channels": ["conda-forge", "defaults"], "dependencies": ["python=3.8", "dask=2020.12.0", "pandas=1.2.0"]},
)
necaris commented 3 years ago

@rubenvdg can you let us know if your original error was resolved by the fix? If so, I'd like to rename this issue to reflect the more recent discussion around the version mismatch (which we're still looking into a fix for).

rubenvdg commented 3 years ago

@necaris will look into it this week; will report back.

rubenvdg commented 3 years ago

We tried this, but ran into https://github.com/coiled/feedback/issues/97.

FabioRosado commented 3 years ago

Hello @rubenvdg thank you for raising this issue with us, we would like to update you on this issue. We pushed a fix that allows for creating pip-only environments with a python version other than 3.7. I'm aware of the other issue you came across, but could you try to create your software environment with pip as Daniel suggested?

coiled.create_software_environment(
    name="some_env",
    pip=["dask[complete]==2020.12.0", "pandas==1.2.0"],
)
rubenvdg commented 3 years ago

@FabioRosado thanks for getting back. I can make the cluster now, although I do catch:

/Users/rvdgeer/.pyenv/versions/3.8.6/envs/coiled/lib/python3.8/site-packages/distributed/client.py:1128: VersionMismatchWarning: Mismatched versions found

+---------+---------------+---------------+---------------+
| Package | client                 | scheduler        | workers            |
+---------+---------------+---------------+---------------+
| python    | 3.8.6.final.0      | 3.7.6.final.0       | 3.7.6.final.0      |
+---------+---------------+---------------+---------------+

but running anything will result in KilledWorker.

For example

import dask.dataframe as dd

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

df.groupby("passenger_count").tip_amount.mean().compute()

leads to

KilledWorker: ("('read-csv-c075100a8373a0cab23cd3671871aeb7', 451)", <Worker 'tls://10.2.13.42:39595', name: ruben-van-de-geer9-3765-worker-4-f46f8f, memory: 0, processing: 471>)

which kind of makes sense (given the version mismatch).

So probably, for now, if you want to use pip, you should just make sure the local Python version matches the cluster version.

rubenvdg commented 3 years ago

Great to see that it's possible to spin up clusters again btw (without catching Server 500 error). Or was I just lucky 😄 ?

jrbourbeau commented 3 years ago

I suspect we still to release a new version of coiled which includes the updates we made to handle matching Python version for pip-only software environments. We'll ping here once the new release is out 👍

which kind of makes sense (given the version mismatch)

Dask should be flexible enough to handle running the client and scheduler / workers with different Python versions. I wonder if the KilledWorker is from a packages which is missing from the software environment (e.g. s3fs)? Hopefully the cluster logs contain some relevant information

FabioRosado commented 3 years ago

Great to see that it's possible to spin up clusters again btw (without catching Server 500 error). Or was I just lucky 😄 ?

Glad you can spin clusters without issues, this last release that we did had a few fixes to address that error. 😄

rubenvdg commented 3 years ago

@jrbourbeau Thanks for the heads up! For the sake of completeness, this is the worker log:

distributed.nanny - INFO -         Start Nanny at: 'tls://10.2.11.252:42243'

distributed.worker - INFO -       Start worker at:    tls://10.2.11.252:39419

distributed.worker - INFO -          Listening to:    tls://10.2.11.252:39419

distributed.worker - INFO -          dashboard at:          10.2.11.252:46073

distributed.worker - INFO - Waiting to connect to: tls://ip-10-2-13-226.us-east-2.compute.internal:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -               Threads:                          4

distributed.worker - INFO -                Memory:                   17.18 GB

distributed.worker - INFO -       Local Directory: /dask-worker-space/dask-worker-space/worker-p3wxpreb

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO -         Registered to: tls://ip-10-2-13-226.us-east-2.compute.internal:8786

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection

distributed.worker - INFO - Starting Worker plugin coiled-aws-env

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95\xb6\x05\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-c075100a8373a0cab23cd3671871aeb7\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c-nyc-tlc/trip data/yellow_tripdata_2019-01.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94K\x00J\x00\x00\x00\x01C\x01\n\x94t\x94B\x02\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n\x94}\x94\x8c\x05dtype\x94}\x94(\x8c\x0cpayment_type\x94\x8c\x05UInt8\x94\x8c\x08VendorID\x94h4\x8c\x0fpassenger_count\x94h4\x8c\nRatecodeID\x94h4us}\x94(\x8c\x08VendorID\x94\x8c\x1apandas.core.arrays.integer\x94\x8c\nUInt8Dtype\x94\x93\x94)\x81\x94}\x94\x8c\x06_cache\x94}\x94(\x8c\x0bnumpy_dtype\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02u1\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x04kind\x94\x8c\x01u\x94usb\x8c\x14tpep_pickup_datetime\x94hD\x8c\x02O8\x94\x89\x88\x87\x94R\x94(K\x03hHNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?t\x94b\x8c\x15tpep_dropoff_datetime\x94hO\x8c\x0fpassenger_count\x94h=\x8c\rtrip_distance\x94hD\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\nRatecodeID\x94h=\x8c\x12store_and_fwd_flag\x94hO\x8c\x0cPULocationID\x94hD\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03hWNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x0cDOLocationID\x94h^\x8c\x0cpayment_type\x94h=\x8c\x0bfare_amount\x94hV\x8c\x05extra\x94hV\x8c\x07mta_tax\x94hV\x8c\ntip_amount\x94hV\x8c\x0ctolls_amount\x94hV\x8c\x15improvement_surcharge\x94hV\x8c\x0ctotal_amount\x94hV\x8c\x14congestion_surcharge\x94hVu]\x94(h9hLhQhRhShYhZh[h`hahbhchdhehfhghhhie\x89\x89Ne\x86\x94.'

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.protocol.core - CRITICAL - Failed to deserialize

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.core - ERROR - unsupported pickle protocol: 5

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - ERROR - unsupported pickle protocol: 5

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler

    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f750d96fe90>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result

    future.result()

  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler

    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - INFO - -------------------------------------------------

distributed.worker - WARNING - Mismatched versions found

+---------+---------------+---------------+------------------------------------+

| Package | This Worker   | scheduler     | workers                            |

+---------+---------------+---------------+------------------------------------+

| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |

+---------+---------------+---------------+------------------------------------+

distributed.worker - INFO -         Registered to: tls://ip-10-2-13-226.us-east-2.compute.internal:8786

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection

distributed.protocol.pickle - INFO - Failed to deserialize b'\x80\x05\x95\xb9\x05\x00\x00\x00\x00\x00\x00\x8c\x11dask.optimization\x94\x8c\x10SubgraphCallable\x94\x93\x94(}\x94\x8c)read-csv-c075100a8373a0cab23cd3671871aeb7\x94\x8c\x0edask.blockwise\x94\x8c\x12PackedFunctionCall\x94\x93\x94)\x81\x94}\x94\x8c\x04func\x94\x8c\x15dask.dataframe.io.csv\x94\x8c\x10pandas_read_text\x94\x93\x94sb\x8c\x02_0\x94\x86\x94sh\x04\x8c\x02_0\x94\x85\x94\x8c\x11subgraph_callable\x94t\x94R\x94]\x94(\x8c\x11pandas.io.parsers\x94\x8c\x08read_csv\x94\x93\x94(\x8c\x0fdask.bytes.core\x94\x8c\x14read_block_from_file\x94\x93\x94\x8c\x0bfsspec.core\x94\x8c\x08OpenFile\x94\x93\x94(\x8c\x0bfsspec.spec\x94\x8c\rmake_instance\x94\x93\x94\x8c\ts3fs.core\x94\x8c\x0cS3FileSystem\x94\x93\x94)}\x94\x8c\x04anon\x94\x88s\x87\x94R\x94\x8c-nyc-tlc/trip data/yellow_tripdata_2019-09.csv\x94\x8c\x02rb\x94NNNNt\x94R\x94J\x00\x00\x00\x07J\x00\x00\x00\x01C\x01\n\x94t\x94B\x02\x01\x00\x00VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n\x94}\x94\x8c\x05dtype\x94}\x94(\x8c\x0cpayment_type\x94\x8c\x05UInt8\x94\x8c\x08VendorID\x94h4\x8c\x0fpassenger_count\x94h4\x8c\nRatecodeID\x94h4us}\x94(\x8c\x08VendorID\x94\x8c\x1apandas.core.arrays.integer\x94\x8c\nUInt8Dtype\x94\x93\x94)\x81\x94}\x94\x8c\x06_cache\x94}\x94(\x8c\x0bnumpy_dtype\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02u1\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01|\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x04kind\x94\x8c\x01u\x94usb\x8c\x14tpep_pickup_datetime\x94hD\x8c\x02O8\x94\x89\x88\x87\x94R\x94(K\x03hHNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK?t\x94b\x8c\x15tpep_dropoff_datetime\x94hO\x8c\x0fpassenger_count\x94h=\x8c\rtrip_distance\x94hD\x8c\x02f8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\nRatecodeID\x94h=\x8c\x12store_and_fwd_flag\x94hO\x8c\x0cPULocationID\x94hD\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03hWNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94b\x8c\x0cDOLocationID\x94h^\x8c\x0cpayment_type\x94h=\x8c\x0bfare_amount\x94hV\x8c\x05extra\x94hV\x8c\x07mta_tax\x94hV\x8c\ntip_amount\x94hV\x8c\x0ctolls_amount\x94hV\x8c\x15improvement_surcharge\x94hV\x8c\x0ctotal_amount\x94hV\x8c\x14congestion_surcharge\x94hVu]\x94(h9hLhQhRhShYhZh[h`hahbhchdhehfhghhhie\x88\x89Ne\x86\x94.'

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.protocol.core - CRITICAL - Failed to deserialize

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.core - ERROR - unsupported pickle protocol: 5

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - ERROR - unsupported pickle protocol: 5

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler

    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - INFO - Connection to scheduler broken.  Reconnecting...

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x7f750d96fe90>>, <Task finished coro=<Worker.handle_scheduler() done, defined at /opt/conda/lib/python3.7/site-packages/distributed/worker.py:976> exception=ValueError('unsupported pickle protocol: 5')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result

    future.result()

  File "/opt/conda/lib/python3.7/site-packages/distributed/worker.py", line 979, in handle_scheduler

    comm, every_cycle=[self.ensure_communicating, self.ensure_computing]

  File "/opt/conda/lib/python3.7/site-packages/distributed/core.py", line 571, in handle_stream

    msgs = await comm.read()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/tcp.py", line 216, in read

    allow_offload=self.allow_offload,

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 80, in from_frames

    res = _from_frames()

  File "/opt/conda/lib/python3.7/site-packages/distributed/comm/utils.py", line 64, in _from_frames

    frames, deserialize=deserialize, deserializers=deserializers

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/core.py", line 151, in loads

    value = _deserialize(head, fs, deserializers=deserializers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 374, in deserialize

    return loads(header, frames)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 70, in pickle_loads

    return pickle.loads(x, buffers=buffers)

  File "/opt/conda/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 75, in loads

    return pickle.loads(x)

ValueError: unsupported pickle protocol: 5

distributed.worker - INFO - -------------------------------------------------

distributed.worker - WARNING - Mismatched versions found

+---------+---------------+---------------+------------------------------------+

| Package | This Worker   | scheduler     | workers                            |

+---------+---------------+---------------+------------------------------------+

| python  | 3.7.6.final.0 | 3.7.6.final.0 | {'3.8.6.final.0', '3.7.6.final.0'} |

+---------+---------------+---------------+------------------------------------+

distributed.worker - INFO -         Registered to: tls://ip-10-2-13-226.us-east-2.compute.internal:8786

distributed.worker - INFO - -------------------------------------------------

distributed.core - INFO - Starting established connection
jrbourbeau commented 3 years ago

Hrm interesting, thanks for positing the worker logs. Would you mind sharing the software environment you used when running that snippet?

rubenvdg commented 3 years ago

Sure:

import coiled
from dask.distributed import Client

coiled.create_software_environment(
    name="some_env",
    pip=["dask[complete]==2020.12.0", "pandas==1.2.0", "s3fs"],
)

cluster = coiled.Cluster(n_workers=10, software="some_env")
client = Client(cluster)

import dask.dataframe as dd

df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
    storage_options={"anon": True},
    blocksize="16 MiB",
).persist()

df.groupby("passenger_count").tip_amount.mean().compute()
rubenvdg commented 3 years ago

In my experience, conflicting Python versions do cause issues. This week we were running Dask on some Kubernetes clusters on Google and already 3.8.0 (scheduler, worker) vs 3.8.1 (client) was causing problems.

FabioRosado commented 3 years ago

Hello Ruben, we have fixed this issue on the latest opsdroid version. For some reason, the s3fs dependency was throwing conflicts when solving. We are now installing dependencies on a coiled environment (or your own if you specify it) - this has fixed the conflict issue.

I'm going to close this issue, but please feel free to re-open or create a new issue if you encounter any further problems.