Closed jean-airoldie closed 1 year ago
At this stage I haven't considered whether we port to another zmq binding, but would definitely entertain the idea if there was a compelling reason to do so!
The list for potentially compelling reasons include:
libsodium-sys
.These are the main reason I wrote this crate in the first place.
For tokio
integration, since all sockets are thread safe, you can do something like a dedicated thread that polls all the sockets at the same time, and that calls the appropriate wakers.
One pain point I have is interop with windows. This is mainly due to compilation being a bit troublesome & using PollEvented for mio/tokio integration.
I don't know how performant a dedicated thread would be over the current implementation, it would probably need to be benchmarked.
There is some effort in a WIP PR to update to standard futures in prep for async/await
on stable. I believe there are some benchmarks on there.
One pain point I have is interop with windows. This is mainly due to compilation being a bit troublesome
The crate is compatible with windows. You don't need to install libzmq, only dependency is cmake
. It compiles from source, so this should not be an issue.
using PollEvented for mio/tokio integration.
You would not need to use mio at all since the Poller
API replaces mio::Poll and works on windows, without any problems.
I don't know how performant a dedicated thread would be over the current implementation, it would probably need to be benchmarked.
You cannot simply poll the file descriptor like you do here https://github.com/cetra3/tmq/blob/2b2951ca802d85369a37d736e82f086438ed89e1/src/socket.rs#L40
To quote from the doc.
The returned file descriptor is also used internally by the zmq_send and zmq_recv functions. As the descriptor is edge triggered, applications must update the state of ZMQ_EVENTS after each invocation of zmq_send or zmq_recv.To be more explicit: after calling zmq_send the socket may become readable (and vice versa) without triggering a read event on the file descriptor.
This is inefficient since you have to actively getsockopt(ZMQ_EVENTS) each time before polling. And the overhead increases with each new socket.
Instead, if you use the zmq_poller
API, (which is what is behind Poller), you register the sockets that you want, as well as the event and libzmq will tell you when an event occurs. This way you can effectively poll all the sockets in your application in a single thread asynchronously.
edit: ZMQ_FD => ZMQ_EVENTS
Ah yep I'm not familiar with zmq_poller
but if that is more efficient, then that's definitely the way to go! I have a feeling that it is very cross-purposes with the tokio reactor which may make it a challenge to integrate.
Ah yep I'm not familiar with zmq_poller but if that is more efficient, then that's definitely the way to go!
A while ago, before I knew zmq_poller
was a thing, I wanted to integrate into tokio with the method that you are currently using. However I found that overhead of the getsockopt(ZMQ_EVENTS) decreased performance by about 5X https://github.com/zeromq/libzmq/issues/3487#issue-438116512.
I have a feeling that it is very cross-purposes with the tokio reactor which may make it a challenge to integrate.
Yes exactly. Conceptually, we are dealing with 2 reactors at once, tokio's and zmq's. So we want to enable the ZeroMQ
event loop to communicate with the tokio event loop with the least overhead possible.
edit: ZMQ_FD => ZMQ_EVENTS
In the PR there were a few comments about speeding up the getsockopt
implementation. I would also be interested why zmq_poller
is that more efficient than just using the raw fd.
The problem is not the raw fd per se, but the fact that you have to call ZMQ_EVENTS
each time you want to poll, like @Kobzol observed:
The core of the issue is the incospiciously looking socket option ZMQ_EVENTS, which must be queried after polling the socket, otherwise it may deadlock in certain situations where the high watermark buffer is full (zmq_poll does this internally, and I'm not even sure if it's intentional, since the implementation is just.. mad).
This is due to the fact that a zmq socket might contain more than one kernel socket. So libzmq cannot provide a single evented fd. Instead you manually update its state by calling ZMQ_EVENTS. This way libzmq checks if any of the kernel socket containing within the zmq socket has received an event.
The performance overhead increases linearly with the number of sockets you want to poll at the same time, since you must call ZMQ_EVENTS on each one of them.
The zmq_poller
is the opposite, you register a socket and a event (read / write or both) and zmq will wakeup the poller as soon as one of the events if received.
Conceptually, zmq_poller is like a true epoll
, whereas zmq_poll
is only a hack used to allow zmq sockets to integrate into a reactor that takes raw fds.
Personally I think that the less ZMQ crates there are, the better, since there are already at least 6 crates and this fragmentation is not good IMO. Integration with tokio
should ideally just be a module in a normal ZMQ crate (possibly hidden behind a feature).
Regarding the polling, I also think that using zmq_poll
directly instead of manually calling ZMQ_FD
and ZMQ_EVENTS
would be a better choice. In terms of performance, for a single PUSH/PULL socket the performance seems to be identical to a synchronous version if you read at least ~8 messages at a time, to amortize the event loop overhead, but this would also be true with a dedicated poller thread. I haven't done benchmarking of more sockets at the same time, although I'm pretty sure that zmq_poll
uses the same logic that ZMQ_EVENTS
does for each polled socket (reading the input mailbox events), so I'm not sure if the scaling can get worse because of this specifically.
I'm more motivated by correctness in this case, because I'm still a bit worried that there might be some edge cases where the approach that I'm using in the PR might end up producing lost messages or deadlocks.
However, there is a problem with the older sockets, which are not thread-safe. Your crate is explicitly covering a newer subset of ZMQ, this crate so far (and other Tokio ZMQ crates, along with rust-zmq
, are more focused on the older socket types). People using an older version of the library would be forced to upgrade if the older sockets were not supported.
The sockets are so different (mainly regarding the thread-safety) that the asynchronous approach for the old and new sockets will probably be different too. I didn't try it yet, but I suppose that the older sockets can't work with the approach that you describe (a separate thread polling a set of sockets)? In this separate thread scenario, how does adding new sockets work? You need to add them to the poller instance, but the polling thread is stuck waiting on events on existing sockets. With epoll
you could use e.g. eventfd
to artificially wake up the polling loop, but I'm not sure if something like that can be done in ZeroMQ?
EDIT: I noticed that you mentioned the newer ZMQ poller functions, instead of zmq_poll
. If that is more performant, that would be nice, but again, if the feature is too new, it would rule out old versions of the library, so there can be multiple implementations inside the crate for example.
Integration with tokio should ideally just be a module in a normal ZMQ crate (possibly hidden behind a feature).
Agreed.
People using an older version of the library would be forced to upgrade if the older sockets were not supported.
Yes precisely. Also these newer thread-safe socket don't support multi-part messages. They still have better performance though, even without the amortization. And I personally think that multi-part messages are a poor excuse for a lack of a proper serialization library. I think it kinda made sense for c++, but I don't see the point in rust since we have a lot of great serialization crates. Also, writing client code that consumes multi-part messages with proper error handling is a pain.
I am really unsure whether I want to support these older socket types because, frankly they are really messy. Overall I would say that the libzmq API is one big foot gun, but its especially true for these older socket types.
In this separate thread scenario, how does adding new sockets work?
The separate polling thread would have a Server
socket which is used by clients to pass messages and socket handles. This Server
socket would be part of the polling loop, so it would wakeup upon receiving an event. So in a sense, this polling thread is like a service with a fixed address (inproc address) so that any socket can connecting to it and send commands.
If that is more performant, that would be nice, but again, if the feature is too new, it would rule out old versions of the library, so there can be multiple implementations inside the crate for example.
I can anecdotally say that the performance is significantly better (its one of the primary reason zmq_poller was created in the first place from what I understand), but I've removed the benchmarks that showed so long ago when I switched to the poller API. Also, the Poller API is compatible with the older socket types.
Purely selfish I know, but I am using pub/sub in my companies product and would like to maintain support for them
Understandable.
I internally use these older socket types because they are required to interact with some of libzmq's APIs anyway. But I never put the time required for them to have a good API since they are basically separate from the thread-safe types. For instance, with multi-part messages, for proper error handling you need to return a Vec<Msg>
(or a drain iterator) when send
returns an error, etc.
The bottom line is that I don't use these socket types, and therefore I would probably not be willing to put the time.
I'm not knowledgeable enough to know how topic subscribing works with the new socket types, but if there is an alternative I'm all ears!
I am thinking this definitely warrants some investigation. Any more idiomatic/ergonomic use of zmq is always welcome.
At this stage I'm not sure what path to take to even try this out. What would you suggest would be the next step?
Basically the new sockets type reimplement each of the old types, with some differences to improve performance but also some bugs / issues. This page describes these new types from a high level. For a PUB/SUB pattern, the new socket types are Radio-Dish
. The doc for both Radio and Dish has some decent examples.
The principal difference with the old PUB/SUB
types is that they use Groups instead of topics. Groups have a max size of 15 characters and are matched exactly, as opposed to topics that match only the beginning of the string.
I'd say if you have more complex uses cases, or general questions in mind, file an issue and I'll see how I can help you.
We could in theory implement the asynchronous operations in terms of something like impl Stream<..> for <rust-zmq-Socket>
and impl Stream<..> for <libzmq-rs-Socket>
to share code and infrastructure in a single crate. But since right now rust-zmq
uses pretty much only the old sockets and libzmq-rs
uses only the new sockets, the asynchronous implementations would possibly differ a lot anyway (thread-safety - polling in another thread can't be used, need to support multipart messages, different socket semantics, etc.).
Really the new sockets are so different that it's basically a whole new library :-) As I understand this change was motivated/inspired by nanomsg
.
In my use case I only used multiparts to support routing (as I understand routing is supported as a first class citizen with the new sockets that are routable) and otherwise used a serialization framework to build on top of ZeroMQ messages. I may possibly switch to the new sockets in the future, but new tokio
support for the old sockets would be nice anyway, so I hope that we can finish the PR once await
becomes stable.
We could in theory implement the asynchronous operations in terms of something like impl Stream<..> for
and impl Stream<..> for to share code and infrastructure in a single crate.
I'm unsure whether the streaming semantics are appropriate. Last time a used it (several months ago), it was strictly inferior to a event loop. How does error handling work with the new Stream
API?
Really the new sockets are so different that it's basically a whole new library :-) As I understand this change was motivated/inspired by nanomsg.
Yes, precisely. Its also one of the main reason why libzmq-rs
is a separate crate from rust-zmq
.
In my use case I only used multiparts to support routing (as I understand routing is supported as a first class citizen with the new sockets that are routable)
Yes, routing is now ez peezy with the new Server
socket type.
let msg = server.recv_msg()?;
let id = msg.routing_id().unwrap();
server.route("reply to the client", id)?;
If you want to do nested routing, then you need to keep track of previous routing ids in a stack inside the message that you are passing, so that when you want to route the message back, you can pop back the routing_id
. Its pretty damn elegant imo.
I'm unsure whether the streaming semantics are appropriate. Last time a used it (several months ago), it was strictly inferior to a event loop. How does error handling work with the new Stream API?
The event loop concept is orthogonal to Stream
. Stream
is just a trait that represents something that produces a stream of items, in this case a socket that produces a stream of messages. For the old sockets it's a bit complicated because a message can have multiple parts, for the newer ones it should be simple. Sink
is another trait that represents something that consumes messages, in this case a socket that you can use to send messages through. Error handling is relatively simple, both Stream
and Sink
have a generic/associated Error
type and the asynchronous receive/send on them can fail. Usually it's assumed that if they fail, the Stream/Sink can't be used anymore, but you can easily implement it in such a way that it recovers from errors.
Event loop (in the case of tokio
) is a hidden abstraction that just makes (I/O) futures, streams and sinks work. By default tokio
uses I/O threads that handle polling, but as you said earlier, you can also make a custom thread for polling and just call the Future
wakers directly.
In any case, adding support for async/await
to ZeroMQ sockets pretty much means implementing Stream/Sink
. With that you gain access to a lot of extension traits that define useful methods (like send
, send_all
, next
, etc.) and it will fit into the rest of the Future
ecosystem, allowing to use await
with the sockets. You could also implement the asynchronous functions directly, something like async fn send(&mut self, msg..)
on each ZMQ socket, but I'm not sure what would be the benefit of that. Stream
is basically already very similar to async fn recv(...)
, only Sink
has additional semantics regarding buffering, but for the newer sockets it should be super simple to match these semantics. And even if you manually implemented async fn send/recv
for each socket, adding Stream/Sink
on top of that should be a few lines of code and it will enable integration with everything else that already uses/expects Streams and Sinks, so I don't see a reason why not to do it. With the older sockets there are some slight issues with the semantics, but for the newer ones I think that it should be a nice fit.
Usually it's assumed that if they fail, the Stream/Sink can't be used anymore, but you can easily implement it in such a way that it recovers from errors.
That's mainly the problem that I had in mind. You would have to differentiate between recoverable errors (WouldBlock) and unrecoverable errors (Context terminated). How would that work, from a high level?
You could also implement the asynchronous functions directly, something like async fn send(&mut self, msg..) on each ZMQ socket, but I'm not sure what would be the benefit of that.
How would a simple request reply pattern work with a stream API?
I agree that Stream
and Sink
should be used wherever it makes sense, as it makes it much much easier to compose with the greater futures community. For instance I use tmq within an actix-web service.
If you're getting WouldBlock this should return Async::NotReady
, setup a notifier & move on. If you get other errors, then obviously the response is a little different. Have a look at the current poller.
Request/Reply pattern has been implemented via the Responder
trait on the current master branch. The response example shows this.
Request/Reply pattern has been implemented via the Responder trait on the current master branch.
I meant a single request reply. From what I understand a stream yields is zero or more elements. So lets say you want to do a single request-reply to a remote API, and then use this result. Would that be achievable with this API? I feel like a Future
would be required in this case.
Not sure how it works in async/await world, but in the current world you can run take on the stream if you just want one item.
take consumes the stream, that's something that you would have to do with the old futures. With the new ones you can just call next() on the Stream. It returns a Future that resolves to the next element in the stream, and it just takes &mut self, so typically you can call this method in a loop (https://github.com/Kobzol/tmq/blob/future/examples/pull.rs).
If a single socket implements both Stream and Sink, you can easily interleave send and receive to achieve a request/reply pattern (https://github.com/Kobzol/tmq/blob/future/tests/dealer.rs#L148).
Since libzmq-rs
is not maintained anymore, I think this issue can be closed.
Indeed
I'm the author of libzmq-rs.
Currently there is a ton of different low zmq bindings crates, which leads to plenty of work duplication. Moreover, since these are usually bindings, they tend to be pretty close to c-code, which makes then not really idiomatic. The objective of my crate was to provide a idiomatic API to properly leverage the potential of zmq.
As I would be interested in porting ZeroMQ to tokio, I'm wondering if there is anyway that we could join our efforts to do so.