dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Should we use ZeroMQ? #776

Closed mrocklin closed 7 years ago

mrocklin commented 7 years ago

I've been wrestling with network communication recently as our bottlenecks have become increasingly communication focused. I'd like to get feedback from people like @minrk and @pitrou on whether or not it is worth exploring switching our communication stack over to ZeroMQ.

Our Communication Types

Dask.distributed engages in the following kinds of communication. Everything is point-to-point.

  1. Frequent small messages along long-running connections. We might dump in thousands of 100 byte messages in a second. Currently we batch manually. Presumably ZeroMQ would do this for us allowing us to remove some of the BatchedSend infrastructure.
  2. Peer to peer data movement. These are small-to-large messages (1kb to 10GB) along ephemeral worker-to-worker connections. Sometimes these overlap. Here we're concerned about not having this communication block or be blocked by other executing code, establishing connections quickly (or else caching connections in a way that respects open file limits), and possibly negotiating multiple messages at once on the same socket.
  3. Periodic small messages for heart beats.

Advantages of ZeroMQ

  1. We get to remove BatchedSend and timeout handling
  2. We get zero-copy for free (though this is probably not that big of a deal)
  3. We maybe get Infiniband for free?
  4. We get pub-sub, though I'm not sure how we would use this currently?

Questions

  1. How does ZeroMQ handle hard failures on the other end of the socket?
  2. We are sometimes bound by open file handles. Does one Request-Reply socket live on one TCP Socket?
  3. We currently suffer a delay of 10ms-100ms to establish TCP connections through Tornado. Is this likely to change with ZeroMQ? (Though in truth this is more likely something we're doing wrong internally)
  4. Are there opportunities for ZeroMQ to hold on to "closed" connections in the background while still respecting open file limits? (I suppose we could also do this connection pooling in application code)
  5. Assuming that we're using pyzmq's Tornado connection, is there anything we should be concerned about if the event loop is blocked for a non-trivial amount of time? Obviously we can't dump any new messages during this time, but is ZeroMQ handling things nicely in the background in a separate system thread or is it blocked?
  6. If we install pyzmq's event loop can we still overlap with other Tornado applications like Bokeh?

Implementation

What is the right way to experiment with this? Presumably we change around core.connect, core.read and core.write and we switch out all BatchedSend streams with normal streams. Is there anything else?

We might still want to keep some of our own batching because the serialization and compression parts of our communication pipeline will still benefit.

pitrou commented 7 years ago

Some notes while reviewing the ZeroMQ / pyzmq docs:

Things ZeroMQ can bring us:

A note about ZeroMQ's batching: it only waits for the local network interface (or kernel buffer) to be ready for sending, which probably doesn't mean much unless we're saturating the network capacity. Our BatchedSend does more, as it has a built-in timer to coalesce messages.

minrk commented 7 years ago

It's definitely worth considering, but it's not a given that it would be a win overall. I've been thinking on the IPython / Jupyter side that perhaps we shouldn't have used zmq, as many of the potential advantages don't really come up for Jupyter use cases. But it is IPython Parallel where the benefits are greatest.

We maybe get Infiniband for free?

sort of. You don't get truly native IB, but you can use the TIPC transport. I think most use TCP-over-IB, which you would already have anyway.

We get pub-sub, though I'm not sure how we would use this currently?

Other than instrumentation, I'm unsure how PUB-SUB would work for dask. In IPython Parallel, this is used to funnel output, which I thought was a cool idea enabling things like monitoring services, but I think nobody has ended up using them. It could enable you to do things like explode the Scheduler into a constellation of loosely connected processes, so that things like bokeh / analytics / instrumentation can't slow down scheduling. This is perhaps the most successful aspect of IPython Parallel's zmq redesign: 'schedulers' are lightweight distributors of messages, and everything heavy happens in a 'Hub' process that gets passively notified of everything that passed through the schedulers.

Diagramming message flow patterns is extremely helpful for designing zmq-connected applications, as you have to think a lot about what sockets are right to use where, and there can be several valid options.

How does ZeroMQ handle hard failures on the other end of the socket?

In one sense, it doesn't. ZeroMQ hides connect/disconnect events, which can be nice, but also frustrating. The main thing it does is ensure that messages are delivered completely or not at all - a message that is incompletely delivered is considered not at all delivered. Depending on the socket type, if you try to send to a peer that is gone, the message will either wait in memory to be sent, or be discarded as undeliverable.

We are sometimes bound by open file handles. Does one Request-Reply socket live on one TCP Socket?

Each zmq socket corresponds to one or more FD. Due to zmq's internal use of file descriptors for inter-thread signaling, I would expect to hit fd limits sooner, not later, with zmq.

We currently suffer a delay of 10ms-100ms to establish TCP connections through Tornado. Is this likely to change with ZeroMQ?

I wouldn't think so, unless you use a transport other than tcp. There is one potential difference, in that the connection handshake all happens in a GIL-less IO thread in C++, so that latency may be hidden behind other work going on in Python.

Are there opportunities for ZeroMQ to hold on to "closed" connections in the background while still respecting open file limits?

I'm not quite sure what you mean, but zeromq doesn't generally keep 'closed' connections to be re-used on demand. You would have to use similar explicit connect/close calls to manage your FD usage.

Assuming that we're using pyzmq's Tornado connection, is there anything we should be concerned about if the event loop is blocked for a non-trivial amount of time? Obviously we can't dump any new messages during this time, but is ZeroMQ handling things nicely in the background in a separate system thread or is it blocked?

This situation should be improved with zmq. All of the actual network IO happens in one or more GIL-less C++ threads, so the only part that's in contest with blocking Python is handing the memory off to libzmq, which is very quick.

A send from Python involves:

  1. Socket.send(msg), with either:
    • copy=False: builds a message with a pointer to Python-owned memory (no copies, but more bookkeeping)
    • copy=True: makes an in-memory copy of the message data, owned by C++
  2. call zmq_send, which is ~instant, since it is only passing a pointer around
  3. (in io thread) actual network send, which can take arbitrarily long, without any contention from Python
  4. if not copy: IO thread informs Python that it's done with memory

Most of the work with pyzmq happens in a GIL-less C++ thread. A zmq_send is really handing a pointer to C++, so generally completes extremely quickly, and completes in a length of time independent of the size

If we install pyzmq's event loop can we still overlap with other Tornado applications like Bokeh?

Yes, absolutely. The Jupyter notebook is a regular tornado webapp that uses the zmq integration for communication with kernels. Call ioloop.install() to tell tornado to use pyzmq's poller implementation.

We might still want to keep some of our own batching because the serialization and compression parts of our communication pipeline will still benefit.

I think you will want to keep some of this. One thing in particular is sending very large messages. Since zeromq delivers only whole messages, you will likely want to chunk very large messages into multiple zmq sends. I don't have a good answer for how big those chunks should be, though. This is something Jupyter / IPython parallel doesn't support, and hasn't really felt a need for.

IPython / Jupyter's design makes it relatively simple to swap out transports. It's been a bit since I looked at distributed's implementation, but if you have these key abstractions:

  1. a 'message' is a sequence of bytes / buffer-providing objects
  2. a central implementation of taking a 'message' and sending it to a peer
  3. a central implementation of receiving data and returning / triggering events with a complete message

experimenting with a zmq implementation shouldn't be too disruptive.

@pitrou's points:

Tornado integration doesn't seem to provide a coroutine-friendly API

pyzmq 15 adds a Futures-based API for both tornado and asyncio coroutines.

ZeroMQ doesn't provide notification of peer disconnect, though it does seem to allow setting the TCP timeout

zeromq 4.2 does allow setting connect timeout, and you can monitor connect/disconnect events, though it's generally recommended to not use this kind of thing for anything other than debugging.

If using pyzmq's Tornado integration, it won't remove the (probably small, but still) overhead of Tornado's I/O loop, since the integration merely replaces epoll() with zmq_poll()

This is certainly true, and if anything it could increase the overhead slightly. You can use the zmq.FD interface to hook into a faster eventloop such as uv, but this is an edge-triggered FD that I find an absolute nightmare to work with.

pitrou commented 7 years ago

pyzmq 15 adds a Futures-based API for both tornado and asyncio coroutines.

Thanks. I was looking at ZMQStream and couldn't find this. In your example, is the Poller really necessary?

minrk commented 7 years ago

Peer to peer data movement.... Here we're concerned about not having this communication block or be blocked by other executing code

This is perhaps where you are likely to see the biggest benefit of pyzmq. Since IO happens in a GIL-less C++ thread, messages are delivered to memory, regardless of blocking Python code. So if you really do have cases where network transfers are waiting for GIL-holding execution, pyzmq ought to help. In zmq, a .recv() is just grabbing a pointer (or copying a buffer if copy=True), so it's a big win if you do manage to overlap IO and computation. Because of this, greedy scheduling is an important part of maximizing throughput in IPython parallel - each worker may be assigned several tasks, so that the libzmq queue can be filled up in the background while a task is processing. The output/result queue works the same way.

minrk commented 7 years ago

In your example, is the Poller really necessary?

Not at all. It's a somewhat contrived example showing that all of the blocking APIs return Futures.

mrocklin commented 7 years ago

In any case, our own protocol layer will remain. I'm not sure which proportion of the overhead it is responsible for; intuitively, I'd expect it to be dominant.

This is a fairly contrived example that is both communication and computation heavy. I've turned off compression for this example (things shouldn't compress anyway)

from tornado import gen
from tornado.ioloop import IOLoop

from distributed import Worker, Scheduler, Client, Nanny
from distributed.client import _wait

loop = IOLoop.current()

s = Scheduler(loop=loop)
s.start(0)
workers = [Worker(s.ip, s.port, loop=loop) for i in range(4)] 

import dask.array as da

@gen.coroutine
def f():
    yield [w._start(0) for w in workers]
    c = Client(s.address, loop=loop, start=False)
    yield c._start()

    x = da.random.randint(0, 255, size=(1000, 1000, 1000),
                          chunks=(5, 1000, 1000)).astype('uint8')

    x = c.persist(x) 
    yield _wait(x)

    for i in range(10):
        f = c.compute(x.mean(axis=0))
        yield f._result() 
        del f
        yield gen.sleep(0.1)

loop.run_sync(f)
         19212167 function calls (18921367 primitive calls) in 100.691 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   244382   16.164    0.000   16.164    0.000 {method 'send' of '_socket.socket' objects}
   457971   13.757    0.000   13.757    0.000 {method 'recv' of '_socket.socket' objects}
   466361   10.604    0.000   10.604    0.000 {method 'join' of 'bytes' objects}
     1465    8.971    0.006    8.971    0.006 core.py:78(<listcomp>)
   492727    7.666    0.000   19.166    0.000 iostream.py:1510(_merge_prefix)
    22177    7.435    0.000   18.929    0.001 iostream.py:360(write)
     3623    3.912    0.001    3.912    0.001 {method 'poll' of 'select.epoll' objects}
    30661    1.322    0.000   27.185    0.001 iostream.py:827(_handle_write)
   457971    1.303    0.000   15.985    0.000 iostream.py:721(_read_to_buffer)
    15338    0.966    0.000    0.966    0.000 {method 'modify' of 'select.epoll' objects}
    21309    0.881    0.000   17.160    0.001 iostream.py:585(_read_to_buffer_loop)
        1    0.855    0.855  100.697  100.697 ioloop.py:750(start)
     3730    0.846    0.000    0.846    0.000 socket.py:129(__init__)
     4447    0.804    0.000    1.053    0.000 lru.py:77(__delitem__)
48136/48080    0.760    0.000   50.168    0.001 gen.py:997(run)
     1865    0.611    0.000    0.646    0.000 {built-in method _socket.getaddrinfo}
     1861    0.584    0.000    0.584    0.000 {method 'connect' of '_socket.socket' objects}
3892106/3879969    0.567    0.000    0.577    0.000 {built-in method builtins.len}
    63619    0.523    0.000    2.431    0.000 gen.py:1058(handle_yield)
   457971    0.512    0.000   14.488    0.000 iostream.py:1010(read_from_fd)
   151489    0.510    0.000    0.760    0.000 stack_context.py:253(wrap)
37582/25759    0.467    0.000   42.000    0.002 gen.py:260(wrapper)
  1318410    0.459    0.000    0.459    0.000 {method 'append' of 'list' objects}
    52878    0.423    0.000    1.123    0.000 ioloop.py:586(add_future)
610408/610407    0.408    0.000    0.791    0.000 {built-in method builtins.isinstance}
93100/93090    0.379    0.000   95.254    0.001 stack_context.py:271(null_wrapper)
    54954    0.368    0.000    0.560    0.000 ioloop.py:932(add_callback)
     3702    0.335    0.000    0.335    0.000 socket.py:399(_real_close)
     3405    0.321    0.000    0.325    0.000 {built-in method io.open}
    21878    0.304    0.000   37.825    0.002 iostream.py:497(_handle_events)
70455/10914    0.280    0.000    0.433    0.000 heapdict.py:45(_min_heapify)
   244382    0.276    0.000   16.440    0.000 iostream.py:1023(write_to_fd)
90468/86690    0.264    0.000    0.929    0.000 concurrent.py:318(_set_done)
    69890    0.244    0.000   56.836    0.001 ioloop.py:598(_run_callback)
    92344    0.244    0.000    0.244    0.000 concurrent.py:168(__init__)
   151752    0.240    0.000    0.240    0.000 {built-in method builtins.hasattr}
    99597    0.232    0.000    0.378    0.000 abc.py:178(__instancecheck__)
     3702    0.228    0.000    0.228    0.000 {method 'unregister' of 'select.epoll' objects}
    11370    0.223    0.000   27.626    0.002 core.py:323(write)
29692/29501    0.214    0.000    3.946    0.000 gen.py:945(__init__)
    20803    0.199    0.000   10.178    0.000 iostream.py:876(_consume)
     5141    0.192    0.000    6.312    0.001 worker.py:1193(gather_dep)
      995    0.191    0.000    0.191    0.000 {built-in method posix.listdir}
     7201    0.189    0.000    0.293    0.000 __init__.py:41(packb)
     3730    0.180    0.000    0.180    0.000 {method 'register' of 'select.epoll' objects}
     3011    0.172    0.000    0.172    0.000 {method '_accept' of '_socket.socket' objects}
74861/73862    0.171    0.000    0.797    0.000 functools.py:742(wrapper)
     5685    0.170    0.000   10.028    0.002 core.py:29(dumps)
    74861    0.169    0.000    0.324    0.000 functools.py:704(dispatch)
     2876    0.166    0.000    0.325    0.000 threading.py:332(notify)
     2880    0.158    0.000    0.227    0.000 scheduler.py:1909(transition_processing_memory)
74861/73862    0.157    0.000    0.308    0.000 gen.py:1207(convert_yielded)
   645902    0.156    0.000    0.156    0.000 iostream.py:478(closed)
     2940    0.156    0.000    0.156    0.000 {method 'acquire' of '_thread.lock' objects}
    74861    0.155    0.000    0.155    0.000 weakref.py:364(__getitem__)
     2875    0.152    0.000    0.152    0.000 {method 'release' of '_thread.lock' objects}
   108151    0.148    0.000    0.148    0.000 heapdict.py:67(_swap)
    18708    0.147    0.000    0.665    0.000 ioloop.py:915(call_at)
mrocklin commented 7 years ago

When doing the experiment above with ten single-threaded workers either in the same process or in different processes we see very different communication times:

With different processes

image

Within the same process

image

We see that the costs of communication increase significantly when all of the workers are in the same process.

mrocklin commented 7 years ago

I made the above graphs by going to http://localhost:8787/status while running the following code:

In [1]: from distributed import Client, LocalCluster
   ...: cluster = LocalCluster(n_workers=10, threads_per_worker=1, nanny=False)
   ...: client = Client(cluster)
   ...: import dask.array as da
   ...: x = da.random.randint(0, 255, size=(1000, 1000, 1000),
   ...:                           chunks=(5, 1000, 1000)).astype('uint8')
   ...: x = client.persist(x)
   ...: 

In [2]: x.mean(axis=0).compute()

I changed nanny= between True and False to use processes or threads respectively.

minrk commented 7 years ago

I have extreme envy of your pipeline charts.

We see that the costs of communication increase significantly when all of the workers are in the same process.

Is this because there is a lot of GIL-holding 'work' required in the receiving? Unless I misunderstand, there isn't enough information in the chart alone to separate whether that is:

  1. receiving data from the pipe (zmq could help)
  2. reconstructing messages from the received bytes (zmq won't help)

But I guess your earlier profile suggests that it is spending a lot of time in socket.recv and b''.join(), both of which should be significantly reduced.

mrocklin commented 7 years ago

reconstructing messages from the received bytes (zmq won't help)

The reconstruction in this case is mostly free.

mrocklin commented 7 years ago

Another more complex way to approach this topic is to ask "How can we support different communication systems?"

For example we might want to try out ZeroMQ, or we might want to try out MPI (several groups have asked about this before). This would require us to encapsulate our notion of connecting, reading and writing, serving, and addresses, into something that could be pluggable.

mrocklin commented 7 years ago

@minrk ok, so if we were to experiment with this it would be very convenient to have a drop-in solution for Tornado's IOStream and TCPServer. It looks like zmq's ZMQStream might satisfy the first API. Is this correct? Do you have any thoughts on how one might design a ZMQ TCPServer-like solution? Is this even a reasonable idea?

Dask/distributed schedulers/workers currently subclass from a Server class that has a handle_stream method that is given a new stream whenever someone initiates a connection. We listen on this stream for many messages and dispatch out to a variety of handlers A stripped down version of our current solution looks like the following, and is probably familiar to you:

class Server(TCPServer):
    @gen.coroutine
    def handle_stream(self, stream, address):
        while not stream.closed():
            msg = yield read(stream)
            assert isinstance(msg, dict)
            reply = msg.pop('reply')

            op = msg.pop('op')
            handler = self.handlers[op]
            result = handler(stream, **msg)
            if type(result) is gen.Future:
                result = yield result

            if reply:
                yield write(stream, result)

(Actual code here)

Ideally we could build something where we could drop in ZMQServer in place of TCPServer.

Additionally, handler might want to send or receive multiple messages on the stream. From some previous experimentation with ZeroMQ I recall that it was a bit odd to send multiple messages without receiving or vice-versa for some socket types. Am I remembering this correctly?

Is mimicking IOStream and TCPServer the right way to go about experimenting with ZeroMQ or would this require more thorough redesign of how we Dask thinks about communication.

minrk commented 7 years ago

While we did base ZMQStream on IOStream early on, that may not have been a good choice. I think its use is more analogous to TCPServer, in fact, as you explicitly instantiate ZMQStream for a given zmq socket, rather than streams being created for new connections.

I think you'll still want to swap out something at the Server level, but it will probably be a bit more different for different transports.

mrocklin commented 7 years ago

I think my basic question is "Is there a way to construct ZMQ structures that precisely mimic the semantics of TCPServer and IOStream so that we can easily experiment with the ZMQ networking stack without touching much of Dask's code?"

It seems that there are two approaches to solve the performances issues above:

  1. Make Tornado better
  2. Switch to ZeroMQ

I suspect that @pitrou is trying the first option now. I was tempted to try the second option in parallel.

However, the thought of restructuring Dask's communication to fit ZMQ's model is a bit more daunting than I am likely to take on at the moment without having some advance evidence that it's a good idea. This is why I'm curious about the "drop in" temporary solution. In my ideal world we find some way to implement a ZMQServer and ZMQStream that are analagous enough to TCPServer and IOStream that we can drop them in with a ~hundred-line change to Dask's codebase.

mrocklin commented 7 years ago

This somewhat old branch of tornado reduces costs by around 30%: https://github.com/mrocklin/tornado/tree/no-copy

There are lots of things wrong with it (tests don't pass) but it's a nice demonstration that some mild changes to tornado can help here.

pitrou commented 7 years ago

Le 08/01/2017 à 19:21, Matthew Rocklin a écrit :

This somewhat old branch of tornado reduces costs by around 30%: https://github.com/mrocklin/tornado/tree/no-copy

Do you mean on the benchmark you posted above, or on a Tornado micro-benchmark?

mrocklin commented 7 years ago

The benchmark above. This removes a large portion of the merge_prefix costs. There is still a bytes copy in the dask protocol (which could now be removed if tornado supports memoryviews) and the socket.send/socket.recv delays (which I don't understand).

pitrou commented 7 years ago

Le 08/01/2017 à 17:22, Matthew Rocklin a écrit :

I think my basic question is "Is there a way to construct ZMQ structures that precisely mimic the semantics of TCPServer and IOStream so that we can easily experiment with the ZMQ networking stack without touching much of Dask's code?"

I think this is the wrong approach. Rather, we should abstract away the I/O layer a little bit in order to hide API discrepancies between the different transport options. I have some rough idea of how to do that, and I'll take a closer look this week.

It seems that there are two approaches to solve the performances issues above:

  1. Make Tornado better
  2. Switch to ZeroMQ

I /suspect/ that @pitrou https://github.com/pitrou is trying the first option now.

Well, I posted all the PRs I had for Tornado, but none of them will make a significant difference here :-) There's a buffer handling overhead in Tornado's streams that's written in pure Python and has an inevitable cost.

pitrou commented 7 years ago

Le 08/01/2017 à 20:08, Matthew Rocklin a écrit :

The benchmark above. This removes a large portion of the merge_prefix costs. There is still a bytes copy in the dask protocol (which could now be removed if tornado supports memoryviews) and the socket.send/socket.recv delays (which I don't understand).

cProfile isn't multithread-aware, so my intuition is that the kernel switches threads during those GIL-releasing calls (send() and recv()) and the reported time is the time spent therefore in other threads.

mrocklin commented 7 years ago

Shouldn't they not be blocking regardless of releasing the GIL? I would expect them to cost at most a memcopy.

pitrou commented 7 years ago

Shouldn't they not be blocking regardless of releasing the GIL?

I'm not sure the double negation is deliberate but, no, they shouldn't be blocking as event loops use non-blocking I/O calls (if the kernel cannot satisfy the I/O immediately, it returns EAGAIN).

mrocklin commented 7 years ago

Perhaps I should have said

Because the sockets are non-blocking I expected them to return very quickly, regardless of the fact that they release the GIL.

Looking a bit more closely at the numbers though, I now note there were 400k calls to socket.recv, each of which took 32us on average (this is on my tornado no-copy branch).

ncalls  tottime percall cumtime percall filename:lineno(function)
455341  14.84   3.259e-05   14.84   3.259e-05   ~:0(<method 'recv' of '_socket.socket' objects>)
32258   14.05   0.0004356   14.05   0.0004356   ~:0(<method 'send' of '_socket.socket' objects>)
853 6.294   0.007379    6.294   0.007379    core.py:78(<listcomp>)
23100   5.357   0.0002319   5.357   0.0002319   ~:0(<method 'join' of 'bytes' objects>)
4194    4.128   0.0009843   4.128   0.0009843   ~:0(<method 'poll' of 'select.epoll' objects>)
455341  1.295   2.844e-06   16.9    3.711e-05   iostream.py:726(_read_to_buffer)
13419   1.149   8.566e-05   1.149   8.566e-05   ~:0(<method 'modify' of 'select.epoll' objects>)
17904   1.011   5.645e-05   6.552   0.000366    iostream.py:1527(pop_prefix)
1   0.8531  0.8531  71.29   71.29   ioloop.py:754(start)
18401   0.8173  4.442e-05   17.99   0.0009774   iostream.py:590(_read_to_buffer_loop)
38654/38598 0.5969  1.547e-05   31.9    0.0008266   gen.py:1016(run)
1729    0.5086  0.0002942   0.5086  0.0002942   socket.py:129(__init__)
455341  0.4907  1.078e-06   15.42   3.387e-05   iostream.py:1017(read_from_fd)
862 0.4873  0.0005653   0.4873  0.0005653   ~:0(<method 'connect' of '_socket.socket' objects>)
79627/79617 0.431   5.414e-06   65.62   0.0008242   stack_context.py:271(null_wrapper)
862 0.4016  0.0004659   0.4146  0.000481    ~:0(<built-in method _socket.getaddrinfo>)
51045   0.3982  7.801e-06   1.666   3.264e-05   gen.py:1077(handle_yield)
28278/20596 0.3961  1.923e-05   23.84   0.001157    gen.py:279(wrapper)
123872  0.3935  3.177e-06   0.6437  5.197e-06   stack_context.py:253(wrap)
25666   0.3258  1.269e-05   14.81   0.0005769   iostream.py:835(_handle_write)

This "many fast calls" regime makes me concerned about Python overhead. However I don't have a good intuition for what kinds of overheads to expect for Python networking operations.

Anyway, if you have thoughts on how to abstract away communication libraries that would be great.

pitrou commented 7 years ago

Le 08/01/2017 à 21:39, Matthew Rocklin a écrit :

Perhaps I should have said

Because the sockets are non-blocking I expected them to return very
quickly, regardless of the fact that they release the GIL.

The system call itself returns quickly, but another Python thread will have taken the GIL in the meantime, and the socket.send() / recv() method waits to take it again before returning. In summary:

pitrou commented 7 years ago

@minrk, I'm curious about your experience with pyzmq performance. I only get about 7000 messages / sec. using this simple benchmark, which seems much too low: https://gist.github.com/pitrou/080d3df1d77f70bbc26ff49828c4472b

pitrou commented 7 years ago

If I switch to copy=True, I get about 12500 messages / sec., which is better but not very impressive either (this also means that the benefits of pyzmq's zero-copy will be difficult to reap...).

pitrou commented 7 years ago

(switching to a DEALER / DEALER or a PUSH / PULL setup doesn't change those numbers significantly)

dalleyg commented 7 years ago

Just wanted to chime in here. We used ZMQ for the protocol backend in a distributed computing library, and its design caused some difficulty. The biggest fundamental issue is that ZMQ hides the connection state. It's possible that some of these concerns have been addressed by newer versions of ZMQ. I believe we were using a version that is now 1-2 years old.

pitrou commented 7 years ago

@dalleyg, thanks a lot for sharing your experience. I agree ZeroMQ's different model (which its authors think is far superior) is a bit annoying to fit in the connection-oriented model that distributed (as most TCP-based applications) relies on. For now I want to keep experimenting with it to devise what kind of performance it can bring us.

pitrou commented 7 years ago

(anecdotal note: the zero-copy performance issue in pyzmq is due to https://github.com/zeromq/pyzmq/pull/408)

mrocklin commented 7 years ago

I am also not sold on ZeroMQ. Mostly I'm attracted by having a separate c-level thread handle networking communication in the background as @minrk points out. ZeroMQ provides this but also presumably brings with it some other problems. It's hard to know the cost of those problems. If we can get by without ZeroMQ I would prefer that. I'm curious how far we could get by implementing our own minimal IOStream implementation in Python.

eriknw commented 7 years ago

I'm curious how far we could get by implementing our own minimal IOStream implementation in Python.

I bet pretty far if only targeting Linux for the purpose of experimentation. I would expect several stubbed toes making it production-ready. Perhaps selectors [builtin] provides a suitable entry-point.

Abstracting the I/O layer could have several merits. Perhaps using a libuv wrapper is worth investigating too. uvloop looks promising, but is currently only Python >=3.5 and does not yet support Windows (a WIP).

I've used ZeroMQ (and like it), and I think the hesitancy to switch to it is warranted. It's convenient in some ways, but it's not a very leaky abstraction, so you end up writing several protocols on top of ZeroMQ sockets that you otherwise wouldn't write. Sometimes you want to know what's going on with the underlying socket.

minrk commented 7 years ago

zmq definitely may not end up a net win. In particular the tendency to hide connection info can get in your way for simple peer-to-peer message patterns, while it facilitates building fault-tolerant, redundant, distributed networks. The argument tends to go that if you need to keep checking connection state, either you are using zmq wrong, or you shouldn't be using zmq. That's a bit absolutist and I don't entirely agree, but with zmq 4 you can check all of that information via the zmq_monitor system if you need to.

As @pitrou points out, there is a cost (in pyzmq) of zero-copy, due to the async nature of the IO thread: zeromq has to notify Python that it's done with the memory, so Python doesn't free it too soon. The current implementation (and the one before it) has a high cost in the many-small-messages case, and zero-copy only starts to have a benefit for larger messages. In IPython parallel, I've added a configurable threshold for zero-copy with a frame size of ~64k, and force copy=True for smaller messages. I'm sure there is room for improvement in the pyzmq zero-copy garbage collection, but haven't had much time to investigate it.

I think one of the big performance costs of using pyzmq with many small messages could be the use of zmq_poll in tornado's inner loop, which seems to have a cost higher than existing wrappers of epoll, etc.. There is a possibility that some of this cost is in pyzmq itself, and I may be able to improve it. libuv / uvloop can offer substantial improvements for the inner loop, and we would have to work with the zeromq FD to integrate with other loops. I have experience with this in zmq.green, and really don't like it, but it is a path to better performance in certain cases.

pitrou commented 7 years ago

Le 11/01/2017 à 11:38, Min RK a écrit :

As @pitrou https://github.com/pitrou points out, there is a cost (in pyzmq) of zero-copy, due to the async nature of the IO thread: zeromq has to notify Python that it's done with the memory, so Python doesn't free it too soon.

Yes, I took a look at that, and it's not pretty :-)

minrk commented 7 years ago

@pitrou I know! You can disable it entirely, and performance significantly improves. Without manual enforcement of keeping Python refs to the memory until zmq is done with it (which you can't know), you risk segfault / message corruption, though :(

There are two sources of the cost:

  1. the cost of the free func itself, being called inside the zmq IO thread (still GIL-less, but adds code to the inner loop for each frame)
  2. the Python thread contention caused by the gc thread

which are a fixed cost per frame, and easily dwarf the cost of a memcpy for a small frame size.

Ultimately, what it boils down to is the zmq_free_fn needs to schedule a Py_DECREF on the buffer, but it needs to do so in a way that doesn't involve grabbing the GIL from the IO thread. It sounds simple, but I haven't been able to come up with a perfect solution.

mrocklin commented 7 years ago

We ended up removing the ZeroMQ comms backend in 1.17.1. This was a useful experiment in how to broaden comms to other systems outside of raw TCP IOStreams. This has since been useful in other contexts, like in-memory queues, but the ZeroMQ comms backend added complexity without improving performance. Thanks everyone for their feedback and involvement here. Closing.