coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

KeyError: 'blosc' & distributed.protocol.core - CRITICAL - Failed to deserialize #78

Closed leifulstrup closed 3 years ago

leifulstrup commented 4 years ago

created dd.read_csv link to a ~1.8GB CSV file from S3

BTW: also saw this warning after from dask.distributed import Client client = Client(cluster) print('Dashboard:', client.dashboard_link)

/Users/leifulstrup/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py:1130: VersionMismatchWarning: Mismatched versions found

+-------------+--------+-----------+---------+ | Package | client | scheduler | workers | +-------------+--------+-----------+---------+ | blosc | None | 1.9.2 | None | | dask | 2.28.0 | 2.23.0 | None | | distributed | 2.28.0 | 2.25.0 | None | | lz4 | None | 3.1.0 | None | | toolz | 0.11.1 | 0.10.0 | None | +-------------+--------+-----------+---------+ warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

ddf.head() results in:

distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/Users/leifulstrup/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/core.py", line 148, in loads
    fs = decompress(head, fs)
  File "/Users/leifulstrup/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/compression.py", line 225, in decompress
    return [
  File "/Users/leifulstrup/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/compression.py", line 226, in <listcomp>
    compressions[c]["decompress"](frame)
KeyError: 'blosc'
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-50-0c61bc4f48b3> in <module>
----> 1 highly_compensated_list = df_USA['highly_compensated_officer_1_name'].unique().compute()

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    165         dask.base.compute
    166         """
--> 167         (result,) = compute(self, traverse=False, **kwargs)
    168         return result
    169 

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    450         postcomputes.append(x.__dask_postcompute__())
    451 
--> 452     results = schedule(dsk, keys, **kwargs)
    453     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    454 

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2712                     should_rejoin = False
   2713             try:
-> 2714                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2715             finally:
   2716                 for f in futures.values():

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1985             else:
   1986                 local_worker = None
-> 1987             return self.sync(
   1988                 self._gather,
   1989                 futures,

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    831             return future
    832         else:
--> 833             return sync(
    834                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    835             )

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1879                 else:
   1880                     self._gather_future = future
-> 1881                 response = await future
   1882 
   1883             if response["status"] == "error":

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/client.py in _gather_remote(self, direct, local_worker)
   1930 
   1931             else:  # ask scheduler to gather data for us
-> 1932                 response = await retry_operation(self.scheduler.gather, keys=keys)
   1933 
   1934         return response

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/utils_comm.py in retry_operation(coro, operation, *args, **kwargs)
    383         dask.config.get("distributed.comm.retry.delay.max"), default="s"
    384     )
--> 385     return await retry(
    386         partial(coro, *args, **kwargs),
    387         count=retry_count,

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/utils_comm.py in retry(coro, count, delay_min, delay_max, jitter_fraction, retry_on_exceptions, operation)
    368                 delay *= 1 + random.random() * jitter_fraction
    369             await asyncio.sleep(delay)
--> 370     return await coro()
    371 
    372 

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    881             name, comm.name = comm.name, "ConnectionPool." + key
    882             try:
--> 883                 result = await send_recv(comm=comm, op=key, **kwargs)
    884             finally:
    885                 self.pool.reuse(self.addr, comm)

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    664         await comm.write(msg, serializers=serializers, on_error="raise")
    665         if reply:
--> 666             response = await comm.read(deserializers=deserializers)
    667         else:
    668             response = None

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/comm/tcp.py in read(self, deserializers)
    202         else:
    203             try:
--> 204                 msg = await from_frames(
    205                     frames,
    206                     deserialize=self.deserialize,

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/comm/utils.py in from_frames(frames, deserialize, deserializers, allow_offload)
     85         res = await offload(_from_frames)
     86     else:
---> 87         res = _from_frames()
     88 
     89     return res

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/comm/utils.py in _from_frames()
     63     def _from_frames():
     64         try:
---> 65             return protocol.loads(
     66                 frames, deserialize=deserialize, deserializers=deserializers
     67             )

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/core.py in loads(frames, deserialize, deserializers)
    146             if deserialize or key in bytestrings:
    147                 if "compression" in head:
--> 148                     fs = decompress(head, fs)
    149                 if not any(hasattr(f, "__cuda_array_interface__") for f in fs):
    150                     fs = merge_frames(head, fs)

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/compression.py in decompress(header, frames)
    223 def decompress(header, frames):
    224     """ Decompress frames according to information in the header """
--> 225     return [
    226         compressions[c]["decompress"](frame)
    227         for c, frame in zip(header["compression"], frames)

~/opt/anaconda3/envs/coiled_env/lib/python3.8/site-packages/distributed/protocol/compression.py in <listcomp>(.0)
    224     """ Decompress frames according to information in the header """
    225     return [
--> 226         compressions[c]["decompress"](frame)
    227         for c, frame in zip(header["compression"], frames)
    228     ]

KeyError: 'blosc'
leifulstrup commented 4 years ago

df_test_lines = pd.read_csv(s3_location, nrows=10) df_test_lines.head()

Works fine but the ddf.head() that points to the same s3 CSV file throws the error above

necaris commented 4 years ago

@leifulstrup thank you for the report! It looks from your initial VersionMismatchWarning that you don't have blosc and lz4 installed locally; can you install those into your environment and try again? We're working on a better error message!

leifulstrup commented 4 years ago

@necaris that worked. I had to restart the jupyter notebook on my local machine. After seeing your note, I realize it says it is missing blosc and lz4. I created a new conda env to test coiled. Also, I tried accessing the file using https instead of s3 and it failed due to lack of requests package.

One thing that was not clear to me before your note was that the warning was alerting me to a mismatch between what was on my local machine (client) and the server (scheduler in your columns). That makes sense now but the architecture of how the local is interacting with the cluster was unclear. I have only set up clusters on my local network of machines and launched the scheduler and workers on one of those machines.

coiled seems like a huge advanced in abstracting the computing in the cloud. Doing the same on storage would be great too.

Looking forward to experimenting with this more.

I am testing this with data stored in S3. Are there ways to estimate what types of charges I may incur using S3 as my data source and the way that dask reads data? I am concerned about racking up a large S3 data out bill?

Kudos to you and the team working on this.

necaris commented 4 years ago

@necaris that worked. I had to restart the jupyter notebook on my local machine. After seeing your note, I realize it says it is missing blosc and lz4. I created a new conda env to test coiled. Also, I tried accessing the file using https instead of s3 and it failed due to lack of requests package.

One thing that was not clear to me before your note was that the warning was alerting me to a mismatch between what was on my local machine (client) and the server (scheduler in your columns). That makes sense now but the architecture of how the local is interacting with the cluster was unclear. I have only set up clusters on my local network of machines and launched the scheduler and workers on one of those machines.

Thank you for the feedback -- we'll have to spend some more time on the documentation there! In the meantime, if you'd like to be sure you have the same environment locally that's running on the cluster, please check out the detailed documentation for software environments at https://docs.coiled.io/user_guide/software_environment_local.html :smile:

Looking forward to experimenting with this more.

Looking forward to more feedback!

I am testing this with data stored in S3. Are there ways to estimate what types of charges I may incur using S3 as my data source and the way that dask reads data? I am concerned about racking up a large S3 data out bill?

Are these S3 buckets that you own? I am not an S3 expert but my understanding is that unless they are buckets you own and you have them set so that you will pay for transfers, then the costs are on the requester (i.e. us). As Coiled is currently free for beta users, you should have nothing to worry about!

However, this would be a great thing to add to our complete cost estimation feature post-beta, so thank you for that reminder!

mrocklin commented 4 years ago

AWS does not charge for data access if you are within AWS, so this is likely free for you.

On Tue, Sep 29, 2020 at 1:41 PM Rami Chowdhury notifications@github.com wrote:

@necaris https://github.com/necaris that worked. I had to restart the jupyter notebook on my local machine. After seeing your note, I realize it says it is missing blosc and lz4. I created a new conda env to test coiled. Also, I tried accessing the file using https instead of s3 and it failed due to lack of requests package.

One thing that was not clear to me before your note was that the warning was alerting me to a mismatch between what was on my local machine (client) and the server (scheduler in your columns). That makes sense now but the architecture of how the local is interacting with the cluster was unclear. I have only set up clusters on my local network of machines and launched the scheduler and workers on one of those machines.

Thank you for the feedback -- we'll have to spend some more time on the documentation there! In the meantime, if you'd like to be sure you have the same environment locally that's running on the cluster, please check out the detailed documentation for software environments at https://docs.coiled.io/user_guide/software_environment_local.html 😄

Looking forward to experimenting with this more.

Looking forward to more feedback!

I am testing this with data stored in S3. Are there ways to estimate what types of charges I may incur using S3 as my data source and the way that dask reads data? I am concerned about racking up a large S3 data out bill?

Are these S3 buckets that you own? I am not an S3 expert but my understanding is that unless they are buckets you own and you have them set so that you will pay for transfers, then the costs are on the requester (i.e. us). As Coiled is currently free for beta users, you should have nothing to worry about!

However, this would be a great thing to add to our complete cost estimation feature post-beta, so thank you for that reminder!

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/coiled/coiled-issues/issues/78#issuecomment-700974353, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBOIDMZ2BYZ7O3WEN3SIJA7JANCNFSM4R6HPTZQ .

leifulstrup commented 4 years ago

@mrocklin thank you. The coiled servers are running on AWS? Good to know. I uploaded the CSV files to S3 to test coiled. Very impressed. I'll do so more testing over the next week and add a Medium post featuring it. I have been promoting Dask in these:

https://medium.com/@lulstrup/4-usaspending-gov-analytics-loading-data-into-dask-and-pandas-3e83af441901

mrocklin commented 4 years ago

Correct. Currently we're launching machines on AWS, and try to make interacting with data there a pleasant experience.

I'm glad to hear about the post. Let us know if we can help or amplify.

On Tue, Sep 29, 2020 at 1:50 PM Leif Ulstrup notifications@github.com wrote:

@mrocklin https://github.com/mrocklin thank you. The coiled servers are running on AWS? Good to know. I uploaded the CSV files to S3 to test coiled. Very impressed. I'll do so more testing over the next week and add a Medium post featuring it. I have been promoting Dask in these:

https://medium.com/@lulstrup/4-usaspending-gov-analytics-loading-data-into-dask-and-pandas-3e83af441901

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/coiled/coiled-issues/issues/78#issuecomment-700978542, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTDHZ25HX7KBWX4ZBL3SIJCATANCNFSM4R6HPTZQ .

FabioRosado commented 3 years ago

Hello, I'm going through the open issues on this repository, and I'm closing some of them. It seems that this issue might be solved. We have a Why do I get Version Mismatch warning on our FAQ perhaps that clarifies why this is happening? 🤔

I'm closing this issue, but please feel free to re-open or create a new issue if you encounter any problems (or if this issue is still happening to you) 😄