python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Dask example from docs does not work #333

Open slotrans opened 4 years ago

slotrans commented 4 years ago

https://streamz.readthedocs.io/en/latest/dask.html

My environment: WSL / Debian on Windows 10 18362.778 Python 3.7.3 streamz 0.5.3 dask 2.15.0

Running the "Sequential Execution" example:

$ python stream_test.py
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
1
2
3
4
5
6
7
8
9
10
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available

Those warnings don't look great, but we did at least get the expected output.

Now the "Parallel Execution" example:

$ python stream_test.py
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
1
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker

Sometimes the "Restarting worker" warning doesn't appear, sometimes it appears a different number of times, but the actual output never goes beyond 1.

If I remove buffer(8) from the pipeline, it works (though still with warnings).

If I change it from buffer(8) to buffer(2), it gets as far as printing 7 then:

distributed.scheduler - ERROR - Couldn't gather keys {'inc-788f27b82d7440855cc1356becb169bc': []} state: [None] workers: []
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: [], inc-788f27b82d7440855cc1356becb169bc
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'inc-788f27b82d7440855cc1356becb169bc': ()}

Making the buffer larger results in no output at all, not even 1.

slotrans commented 4 years ago

A different env: OS X 10.12.6 (Sierra) Anaconda Python 3.7.6 (fresh install) streamz 0.5.3 dask 2.11.0

Sequential example works properly, no warnings.

Parallel example has failed 3 different ways: first run

$ python stest.py
1
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x1080f6cd0>>, <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
import time
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/streamz/core.py", line 1157, in cb
    yield self._emit(x, metadata=metadata)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 501, in callback
    result_list.append(f.result())
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/streamz/dask.py", line 129, in update
    result = yield client.gather(x, asynchronous=True)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1781, in _gather
    response = await future
import time
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/client.py", line 1832, in _gather_remote
    response = await retry_operation(self.scheduler.gather, keys=keys)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
    operation=operation,
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
    return await coro()
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
    convert_stream_closed_error(self, e)
  File "/Users/nyetter/opt/anaconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 123, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

second run

$ python stest.py
1
distributed.scheduler - ERROR - Couldn't gather keys {'inc-20924c3250f1593c8d9b9498aa3b1bc2': ['tcp://127.0.0.1:49205']} state: [None] workers: ['tcp://127.0.0.1:49205']
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:49205'], inc-20924c3250f1593c8d9b9498aa3b1bc2
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'inc-20924c3250f1593c8d9b9498aa3b1bc2': ('tcp://127.0.0.1:49205',)}

third run

$ python stest.py
1

lowering the buffer size to 2:

$ python stest.py
1
2
3
4
5
6
7
distributed.scheduler - ERROR - Couldn't gather keys {'inc-44fa67594e0bf2e8a250e62d197700ba': []} state: [None] workers: []
NoneType: None
distributed.scheduler - ERROR - Workers don't have promised key: [], inc-44fa67594e0bf2e8a250e62d197700ba
NoneType: None
distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {'inc-44fa67594e0bf2e8a250e62d197700ba': ()}
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker

Raising the buffer size to 256 produces no output at all.

martindurant commented 4 years ago

The sequential example does not involve dask at all, yet you are seeing a bunch of warnings and errors just from having created a client instance. This suggests that something is wrong with you dask/distributed installation, perhaps your networking setup - perhaps windows needs to give permissions - and nothing to do with streamz. To be sure, the example works just fine on my system (osx).

OpenCoderX commented 4 years ago

I've had some confusion on my end with dask as well, When I run a sequential series of connected streamz should I see anything in the graph view? I am able to access the web ui and view memory usage, cpu usage but no graph of the streamz. How how would I add a UI, would I be able to use the already included Tornado library to add a UI that would allow me to monitor and change variables during execution?

martindurant commented 4 years ago

The graph will show currently waiting, processing or in-memory tasks. When used with streamz, it is pretty likely that the dask part of the computation happens rather quickly - once the data is gathered by your pipeline, it will be cleared from dask and disappear from the graph view. If you look at the task stream view, you should be able to see tasks that have been done by the cluster over time.

OpenCoderX commented 4 years ago

I think maybe that what I want to do the dask cluster monitor will not support. After some more research I think I want something like https://webkid.io/blog/react-flow-node-based-graph-library/, have you any thoughts regarding using the tornado webserver to control the streamz in realtime using a web interface?