Open mrocklin opened 4 years ago
I wonder if a similar effort to what was done for inproc
communication ( https://github.com/dask/distributed/pull/887 ) could be applied here.
Ideally we wouldn't make a whole new Comm, but could reuse the existing TCP comm
On Tue, Mar 24, 2020 at 11:00 AM jakirkham notifications@github.com wrote:
I wonder if a similar effort to what was done for inproc communication (
887 https://github.com/dask/distributed/pull/887 ) could be applied
here.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3630#issuecomment-603412164, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTCTEJMAJ6VIPDT73WDRJDYLRANCNFSM4LS3TTWQ .
Sure that seems preferable.
Did a little poking around mostly out of curiosity.
The good news is Tornado supports UNIX domain sockets today 😄
The bad news is TCPServer
won't support UNIX domain sockets. However HTTPServer
would. I gather this has something fundamental to do with how UNIX domain sockets work (though I'm not an expert here). It might be possible to get both TCP and UNIX Domain sockets to work with HTTPServer
, but I haven't confirmed that.
There is some work to set this up a UNIX domain socket to use with HTTPServer
, but it doesn't appear to complex.
Here's a useful Google Group thread and a short example in a Gist.
I'm surprised that TCPServer won't support UNIX domain sockets. If you have a reference on this I'd love to see it.
In principle Tornado's TCPServer just holds on to a bunch of Python
socket.socket objects set in non-blocking mode. Even if there isn't a
use_unix_domain_sockets=True
keyword I suspect that we could trick
Tornado's TCPServer into doing this if we really wanted to.
On Thu, Apr 2, 2020 at 7:19 PM jakirkham notifications@github.com wrote:
Did a little poking around mostly out of curiosity.
The good news is Tornado supports UNIX domain sockets today 😄
The bad news is TCPServer won't support UNIX domain sockets. However HTTPServer would. I gather this has something fundamental to do with how UNIX domain sockets work (though I'm not an expert here). It might be possible to get both TCP and UNIX Domain sockets to work with HTTPServer, but I haven't confirmed that.
There is some work to set this up a UNIX domain socket to use with HTTPServer https://github.com/tornadoweb/tornado/blob/18b653cf93fe870cbc630fb48595e1b92fc1e06c/tornado/test/httpserver_test.py#L723-L732, but it doesn't appear to complex.
Here's a useful Google Group thread https://groups.google.com/forum/#!topic/python-tornado/UNO6k2mU4iA and a short example in a Gist https://gist.github.com/superduper/5579037.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3630#issuecomment-608192623, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAEUHOQFSWM4FXH37DRKVBU7ANCNFSM4LS3TTWQ .
I could be wrong as I said before "not an expert here".
FWIW I did a bunch of investigation on this for Apache Kudu localhost communication. I found that localhost unix socket communication was 10-20% faster than TCP, but wasn't an order of magnitude. Worth doing since it was easy in our code base to do it, but the big win was the ability to pass a file descriptor across the socket and use it for shared memory.
Thanks for the heads up @toddlipcon . That's super-helpful.
but the big win was the ability to pass a file descriptor across the socket and use it for shared memory.
To be clear, is this for data already stored on disk?
Friendly nudge (regarding Matt's question above), @toddlipcon 🙂
One other alternative is to use UCX through UCX-Py. We may as well be able to use it with shared memory to speed up intra-node communication -- I've been working on getting it to work with Dask.
To be clear, is this for data already stored on disk?
No, in the benchmarks I was running, it was transferring data from an in-memory buffer cache, rather than hitting spinning disk. If the data is coming off an uncached disk, I'm sure that will be the bottleneck and these CPU optimizations won't make a real difference.
How does the file descriptor come into play then, @toddlipcon?
The file descriptor trick is so that you can set up a shared memory region (eg via shm_open or memfd_create) and then pass that FD over to the other process, allowing both to mmap it even when the two processes may be running as different users.
Gotcha. Thanks for clarifying 🙂
This sounds similar to how multiprocessing
does this with RawArray
and such under the hood by using mmap
. Does that process sound similar to yours or were there notable differences?
I'm not familiar with that code but with a quick look at your links, it seems similar. The one thing to be aware of is that if you're sharing the fd across a security boundary, you need to be aware that a writer can ftruncate() the file and cause a SIGBUS on the reader. Of course if you're following pointers on the reader side, you also need to be pretty careful that the writer can't cause the reader to do "bad things" by modifying the data while it's reading.
memfd offers some facilities to improve this by "sealing" the FD from further modifications, but those are saddled with the problem that you can't "unseal", so most useful for one-shot transfers of data.
Ah sorry should have pointed out that allocations come from the shared memory Heap
. The actual file descriptor points to some temporary file held internally within each of the Heap
's Arena
s. This file descriptor has ftruncate
called only once. If the process runs out of space in any of the existing Arena
s, a new one will be allocated as opposed to resizing one for example. As a result the average user engaging with this API (usually through things like RawArray
), is fairly well protected from these low-level details like the underlying file descriptor or share memory allocation more generally. Anyways these are pretty low-level details in multiprocessing
😄
Perhaps more importantly, the use case for multiprocessing has all of the processes in the same security domain. In other words, one of your subprocesses (fake "threads") is not trying to maliciously crash the other one with whom it's sharing memory. In our case (Apache Kudu), one of the processes is a user-controlled client, and the other is a system-controlled daemon process, and we don't want malicious clients to be able to crash the daemon process.
Probably the easiest path for us in Dask is to use the custom Resolver
solution ( https://github.com/tornadoweb/tornado/issues/2671#issuecomment-499190469 ), which would then be used here. I could be totally wrong though 😅
Strangely a blog post on this topic hit the frontpage of Hacker News today: https://copyconstruct.medium.com/file-descriptor-transfer-over-unix-domain-sockets-dcbbf5b3b6ec
We have been investigating using asyncio-based communication directly in Dask ( https://github.com/dask/distributed/issues/4513 ). One benefit of this protocol is it already has support for UNIX Domain sockets. So should hopefully be straightforward to expose that as well
UNIX domain sockets may be able to accelerate intra-node inter-process communications.
In the TCP comm it may be possible to swap out the socket used if the local and peer hostnames are identical (probably made optional with configuration and dependent on OS used)
Some links from perusing the web:
cc @jacobtomlinson , this might interest you? (if you had a bunch of free time that is)
Previous conversation in https://github.com/dask/dask/issues/3657#issuecomment-603275248