dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

KeyError: pickle-protocol when running df.npartitions #4134

Open jennalc opened 4 years ago

jennalc commented 4 years ago

Running the following:

import dask.dataframe as dd
cols = ['Year', 'Month', 'DayOfWeek', 'Distance',
        'DepDelay', 'CRSDepTime', 'UniqueCarrier', 'Origin', 'Dest']
df = dd.read_csv('/tmp/2000.csv', usecols=cols)
df.head()
df.npartitions

Get the following error:

2020-09-28 09:12:59,186 ERROR tornado.application - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f3573436dd0>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at /opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/distributed/comm/tcp.py:445> exception=ValueError('Your Dask versions may not be in sync. Please ensure that you have the same version of dask and distributed on your client, scheduler, and worker machines')>)
Traceback (most recent call last):
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/distributed/comm/core.py", line 141, in handshake_configuration
    print(remote["pickle-protocol"])
KeyError: 'pickle-protocol'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/tornado/tcpserver.py", line 327, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream
    await self.on_connection(comm)
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/distributed/comm/core.py", line 236, in on_connection
    comm.local_info, comm.remote_info
  File "/opt/sarvi/sig/devMiniconda483/anaconda/envs/notebookEnv/lib/python3.7/site-packages/distributed/comm/core.py", line 149, in handshake_configuration
    "Your Dask versions may not be in sync. "
ValueError: Your Dask versions may not be in sync. Please ensure that you have the same version of dask and distributed on your client, scheduler, and worker machines

Anything else we need to know?:

Environment:

TomAugspurger commented 3 years ago

Do things work if you have matching version of dask / distributed on all the workers & client?

jennalc commented 3 years ago

@TomAugspurger - No. It only works when scheduler/worker/client are all on dask/distributed 2.22.0 or below. As soon as I move them all up to 2.23.0 or above, this error occurs.

TomAugspurger commented 3 years ago

The error message indicates that you haven't upgrades all of them (scheduler, workers, and client).

On Tue, Sep 29, 2020 at 7:00 AM jennalc notifications@github.com wrote:

@TomAugspurger https://github.com/TomAugspurger - No. It only works when scheduler/worker/client are all on dask/distributed 2.22.0 or below. As soon as I move them all up to 2.23.0 or above, this error occurs.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/4134#issuecomment-700655381, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAKAOISNXY7BZWVSOUBBH7TSIHD47ANCNFSM4R45CVKQ .

jennalc commented 3 years ago

Yes, the error message says that, but I guarantee you that they are all the same. I am working on a single host, with only 1 conda environment. So all 3 components are using the exact same packages.

jennalc commented 3 years ago

By the way, if I go add a print statement in core.py to print The local and remote pickle-protocol, I see both print "4" several times, but right before this error occurs, the remote loses the key.

quasiben commented 3 years ago

@jennalc can you build a reproducible example which writes the CSV then reads it back in ? I'd recommend using dask.datasets.timeseries()

In [1]: import dask

In [2]: ddf = dask.datasets.timeseries()

In [3]: ddf.head()
Out[3]:
                       id    name         x         y
timestamp
2000-01-01 00:00:00  1010  Ingrid  0.494533 -0.932426
2000-01-01 00:00:01   985   Alice -0.898573 -0.851398
2000-01-01 00:00:02   964  Ursula  0.744905  0.199218
2000-01-01 00:00:03   947   Frank  0.681676 -0.621700
2000-01-01 00:00:04  1009   Wendy -0.259697  0.160476

Can I also ask for you post the output of conda list -n notebookEnv

jennalc commented 3 years ago

I am able to reproduce with timeseries as suggested, but not with distributed LocalCluster or dask-jobqueue LocalCluster. Don't have a way to share my other cluster right now... will keep trying and let you know if I can get a standalone reproducible example.

jennalc commented 3 years ago

@TomAugspurger @quasiben - I was able to find the root cause of this one. In my custom cluster, I have a SchedulerPlugin with some custom handlers. The TCP communication is actually not python, it is using MsgPack in Java to construct the messages. These messages only contained an "op" and "address", but did not have "pickle-protocol" or "compression" keys, which are expected by the comm handler. If I have my java code inject these keys, I no longer hit this error.

I had followed the documentation here to construct these messages: https://distributed.dask.org/en/latest/protocol.html So maybe it needs some update for these required keys?

quasiben commented 3 years ago

@jennalc , I think those message are only needed on start up, correct ?

jennalc commented 3 years ago

@quasiben - Possibly? I don't quite have it working on my end yet. Currently with the comm.remote_info looking like this:

{"op":"my-handler", "pickle-protocol":4, "compression":None, "address": "tcp:____"}

I no longer hit the KeyError for pickle-protocol, but my custom handler is also no longer being triggered, so seems I am maybe still misunderstanding something about the protocol.

For reference, back in distributed 2.22.0, my handler will run both with and without the pickle-protocol and compression sent in the TCP request.

jennalc commented 3 years ago

Ok, got it working on my end. I needed to sent one message with {"pickle-protocol":4, "compression":None} and then a separate message with {"op":"my-handler"}

quasiben commented 3 years ago

These messages were added in early August: https://github.com/dask/distributed/pull/4019

You might want to look over what happens on connection to see if you need anything else. If you are able to describe things more, are you sending messages after each transition, only on startup... ?

jennalc commented 3 years ago

What I currently have working from my java side is to send message packed messages like this:

My code closes the stream after receiving a response from this handler, but I would assume I could send additional op messages on the same connection without having to re-send the initial connection message again.

There is also an additional response to the connection message, though this is less important to document since the caller can see the responses and handle them. But for reference, this response is returned after the initial connectionn message:

{'compression':None, 'python':(3,7,7), 'pickle-protocol':4}
quasiben commented 3 years ago

The returned response makes sense. Dask is now trying to better handle and message to users when compression is on, Python version, pickle protocol, and others. While this issue is closed it may provide some background on what is happening here: https://github.com/dask/distributed/issues/3767 .