dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

upload_file fails on one specific script #2928

Closed AndreCNF closed 5 years ago

AndreCNF commented 5 years ago

Trying to upload a specific python file to the workers, through upload_file, throws an exception "Exception: object Future can't be used in 'await' expression". I'm running out of ideas of what the problem could be, which is why I need your help. But these are my current hypothesis:

Here's the full error message:

image

What I'm doing in the notebook before calling upload_file:

image

The file that I'm trying to upload:

utils.py.zip

Please help me as I'm really feeling clueless here!

TomAugspurger commented 5 years ago

@AndreCNF generally, text tracebacks are better than screenshots.

Can you post the output of client.get_versions(check=True)?

AndreCNF commented 5 years ago

@TomAugspurger Thank you for letting me know about the preference for text. I'm not used to submitting issues yet.

Here's the text traceback that I get when I try to upload my utils.py (same as in the screenshot):

--------------------------------------------
Exception  Traceback (most recent call last)
<ipython-input-5-381a5030d80b> in <module>
      1 # Upload the utils.py file, so that the Dask cluster has access to relevant auxiliary functions
      2 # client.upload_file('GitHub/eICU-mortality-prediction/NeuralNetwork.py')
----> 3 client.upload_file('utils.py')

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/client.py in upload_file(self, filename, **kwargs)
   2892         """
   2893         result = self.sync(
-> 2894             self._upload_file, filename, raise_on_error=self.asynchronous, **kwargs
   2895         )
   2896         if isinstance(result, Exception):

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    750         else:
    751             return sync(
--> 752                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    753             )
    754 

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    325             e.wait(10)
    326     if error[0]:
--> 327         six.reraise(*error[0])
    328     else:
    329         return result[0]

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/utils.py in f()
    310             if callback_timeout is not None:
    311                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 312             result[0] = yield future
    313         except Exception as exc:
    314             error[0] = sys.exc_info()

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/client.py in _upload_file(self, filename, raise_on_error)
   2836         _, fn = os.path.split(filename)
   2837         d = await self.scheduler.broadcast(
-> 2838             msg={"op": "upload_file", "filename": fn, "data": to_serialize(data)}
   2839         )
   2840 

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    748             name, comm.name = comm.name, "ConnectionPool." + key
    749             try:
--> 750                 result = await send_recv(comm=comm, op=key, **kwargs)
    751             finally:
    752                 self.pool.reuse(self.addr, comm)

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    557     if isinstance(response, dict) and response.get("status") == "uncaught-error":
    558         if comm.deserialize:
--> 559             six.reraise(*clean_exception(**response))
    560         else:
    561             raise Exception(response["text"])

~/Library/Caches/pypoetry/virtualenvs/eicu-mortality-prediction-py3.7/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/usr/local/lib/python3.7/site-packages/distributed/core.py in handle_comm()
    414                             result = asyncio.ensure_future(result)
    415                             self._ongoing_coroutines.add(result)
--> 416                             result = await result
    417                     except (CommClosedError, CancelledError) as e:
    418                         if self.status == "running":

/usr/local/lib/python3.7/site-packages/distributed/scheduler.py in broadcast()
   2656 
   2657         results = await All(
-> 2658             [send_message(address) for address in addresses if address is not None]
   2659         )
   2660 

/usr/local/lib/python3.7/site-packages/distributed/utils.py in All()
    216     while not tasks.done():
    217         try:
--> 218             result = await tasks.next()
    219         except Exception:
    220 

/usr/local/lib/python3.7/site-packages/distributed/scheduler.py in send_message()
   2652             )
   2653             comm.name = "Scheduler Broadcast"
-> 2654             resp = await send_recv(comm, close=True, serializers=serializers, **msg)
   2655             return resp
   2656 

/usr/local/lib/python3.7/site-packages/distributed/core.py in send_recv()
    559             six.reraise(*clean_exception(**response))
    560         else:
--> 561             raise Exception(response["text"])
    562     return response
    563 

Exception: object Future can't be used in 'await' expression

And the output that I get of client.get_versions(check=True):

{'scheduler': {'host': (('python', '3.7.4.final.0'),
   ('python-bits', 64),
   ('OS', 'Darwin'),
   ('OS-release', '18.6.0'),
   ('machine', 'x86_64'),
   ('processor', 'i386'),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')),
  'packages': {'required': (('dask', '2.2.0'),
    ('distributed', '2.2.0'),
    ('msgpack', '0.6.1'),
    ('cloudpickle', '1.2.1'),
    ('tornado', '6.0.3'),
    ('toolz', '0.10.0')),
   'optional': (('numpy', '1.17.0'),
    ('pandas', '0.25.0'),
    ('bokeh', '1.3.1'),
    ('lz4', None),
    ('dask_ml', None),
    ('blosc', None))}},
 'workers': {'tcp://127.0.0.1:65146': {'host': (('python', '3.7.4.final.0'),
    ('python-bits', 64),
    ('OS', 'Darwin'),
    ('OS-release', '18.6.0'),
    ('machine', 'x86_64'),
    ('processor', 'i386'),
    ('byteorder', 'little'),
    ('LC_ALL', 'None'),
    ('LANG', 'en_US.UTF-8'),
    ('LOCALE', 'en_US.UTF-8')),
   'packages': {'required': (('dask', '2.2.0'),
     ('distributed', '2.2.0'),
     ('msgpack', '0.6.1'),
     ('cloudpickle', '1.2.1'),
     ('tornado', '6.0.3'),
     ('toolz', '0.10.0')),
    'optional': (('numpy', '1.17.0'),
     ('pandas', '0.25.0'),
     ('bokeh', '1.3.1'),
     ('lz4', None),
     ('dask_ml', None),
     ('blosc', None))}},
  'tcp://127.0.0.1:65147': {'host': (('python', '3.7.4.final.0'),
    ('python-bits', 64),
    ('OS', 'Darwin'),
    ('OS-release', '18.6.0'),
    ('machine', 'x86_64'),
    ('processor', 'i386'),
    ('byteorder', 'little'),
    ('LC_ALL', 'None'),
    ('LANG', 'en_US.UTF-8'),
    ('LOCALE', 'en_US.UTF-8')),
   'packages': {'required': (('dask', '2.2.0'),
     ('distributed', '2.2.0'),
     ('msgpack', '0.6.1'),
     ('cloudpickle', '1.2.1'),
     ('tornado', '6.0.3'),
     ('toolz', '0.10.0')),
    'optional': (('numpy', '1.17.0'),
     ('pandas', '0.25.0'),
     ('bokeh', '1.3.1'),
     ('lz4', None),
     ('dask_ml', None),
     ('blosc', None))}},
  'tcp://127.0.0.1:65148': {'host': (('python', '3.7.4.final.0'),
    ('python-bits', 64),
    ('OS', 'Darwin'),
    ('OS-release', '18.6.0'),
    ('machine', 'x86_64'),
    ('processor', 'i386'),
    ('byteorder', 'little'),
    ('LC_ALL', 'None'),
    ('LANG', 'en_US.UTF-8'),
    ('LOCALE', 'en_US.UTF-8')),
   'packages': {'required': (('dask', '2.2.0'),
     ('distributed', '2.2.0'),
     ('msgpack', '0.6.1'),
     ('cloudpickle', '1.2.1'),
     ('tornado', '6.0.3'),
     ('toolz', '0.10.0')),
    'optional': (('numpy', '1.17.0'),
     ('pandas', '0.25.0'),
     ('bokeh', '1.3.1'),
     ('lz4', None),
     ('dask_ml', None),
     ('blosc', None))}},
  'tcp://127.0.0.1:65152': {'host': (('python', '3.7.4.final.0'),
    ('python-bits', 64),
    ('OS', 'Darwin'),
    ('OS-release', '18.6.0'),
    ('machine', 'x86_64'),
    ('processor', 'i386'),
    ('byteorder', 'little'),
    ('LC_ALL', 'None'),
    ('LANG', 'en_US.UTF-8'),
    ('LOCALE', 'en_US.UTF-8')),
   'packages': {'required': (('dask', '2.2.0'),
     ('distributed', '2.2.0'),
     ('msgpack', '0.6.1'),
     ('cloudpickle', '1.2.1'),
     ('tornado', '6.0.3'),
     ('toolz', '0.10.0')),
    'optional': (('numpy', '1.17.0'),
     ('pandas', '0.25.0'),
     ('bokeh', '1.3.1'),
     ('lz4', None),
     ('dask_ml', None),
     ('blosc', None))}}},
 'client': {'host': [('python', '3.7.3.final.0'),
   ('python-bits', 64),
   ('OS', 'Darwin'),
   ('OS-release', '18.6.0'),
   ('machine', 'x86_64'),
   ('processor', 'i386'),
   ('byteorder', 'little'),
   ('LC_ALL', 'None'),
   ('LANG', 'en_US.UTF-8'),
   ('LOCALE', 'en_US.UTF-8')],
  'packages': {'required': [('dask', '2.2.0'),
    ('distributed', '2.2.0'),
    ('msgpack', '0.6.1'),
    ('cloudpickle', '1.2.1'),
    ('tornado', '6.0.3'),
    ('toolz', '0.10.0')],
   'optional': [('numpy', '1.17.0'),
    ('pandas', '0.25.0'),
    ('bokeh', '1.3.1'),
    ('lz4', None),
    ('dask_ml', None),
    ('blosc', None)]}}}
TomAugspurger commented 5 years ago

Seems to be related to the size of the file.

with open('small.py', 'w') as f:
    f.write('\n' * 9999)

client.upload_file("small.py")  #OK

with open('large.py', 'w') as f:
    f.write('\n' * 10000)

client.upload_file('large.py')  # error

@AndreCNF are you interested in poking around to see where things go wrong with the larger file?

jrbourbeau commented 5 years ago

Doing a little digging into Worker.upload_file

https://github.com/dask/distributed/blob/cf10db7b6a4fd091c2e1385162e3d36ab59c8f6e/distributed/worker.py#L900-L903

it looks like the problem may be coming from offload in distributed.comm.utils

AndreCNF commented 5 years ago

@TomAugspurger I've done several experiments of changing the file, but it doesn't seem to be due to a specific line of code that it doesn't upload. I believe it really could be just a matter of being a large file, especially as the error message refers to issues with the await operation, a process that @jrbourbeau also identified. I'm not very familiar with the logic behind these asynchronous processes in python, but it's very odd that I'm having this limitation for minimally big uploads. Do you think that some particular pandas, dask or general python operation might be incompatible with the await call or could this be a bug in how dask handles larger uploads (perhaps in the offload method)?

TomAugspurger commented 5 years ago

I don't think it's an issue with the content of the file, just the size. And it's a bug in distributed, not any other library.

On Tue, Aug 6, 2019 at 6:02 PM André Cristóvão Neves Ferreira < notifications@github.com> wrote:

@TomAugspurger https://github.com/TomAugspurger I've done several experiments of changing the file, but it doesn't seem to be due to a specific line of code that it doesn't upload. I believe it really could be just a matter of being a large file, especially as the error message refers to issues with the await operation, a process that @jrbourbeau https://github.com/jrbourbeau also identified. I'm not very familiar with the logic behind these asynchronous processes in python, but it's very odd that I'm having this limitation for minimally big uploads. Do you think that some particular pandas, dask or general python operation might be incompatible with the await call or could this be a bug in how dask handles larger uploads?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2928?email_source=notifications&email_token=AAKAOIVWULAIVWGV5PRYP6TQDH7HRA5CNFSM4IJYNQTKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3WWOKA#issuecomment-518874920, or mute the thread https://github.com/notifications/unsubscribe-auth/AAKAOIQSHMREKKLLPW3UDJTQDH7HRANCNFSM4IJYNQTA .

AndreCNF commented 5 years ago

I've also found out that, by just removing the documentation from the scripts' methods, the upload works. So it seems ever more clear that it's just a file size problem.

mrocklin commented 5 years ago

Can you try https://github.com/dask/distributed/pull/2934 ?

AndreCNF commented 5 years ago

Thank you soo much @mrocklin! Everything's working now!

mrocklin commented 5 years ago

For maintainers, this was introduced when we changed yield to await in many places.

Torando coroutines do a few things better than async/await does. In particular, you can yield concurrent.futures.Future objects and things work well. The await keyword doesn't handle this.