cetra3 / tmq

Rust ZeroMQ bindings for Tokio
150 stars 28 forks source link

std::future and possible contributions #4

Closed Kobzol closed 4 years ago

Kobzol commented 5 years ago

Hi! Thanks for this effort, I think that ZeroMQ is a very useful piece of technology, which sadly still doesn't have a nice async story for Rust yet (as you stated in your blog post). I have been a little desperate because of this situation and wanted to start yet another Tokio ZMQ crate, when I found tmq :)

I have been using tokio-zmq for a while, it has a nice API, however there are some problems with it that prohibit me from using it further (mainly because it's GPL-licensed, it's not on GitHub and I have triggered some errors in it that are still unfixed, futures-zmq works but is much slower).

This crate would be very useful for me and I'd like to contribute, especially now that tokio is reaching 0.2, futures is reaching 0.3 and async/await is almost stable, I think it would be a good time to push this crate forward with the new std::future interface.

It would be nice if you could create some tasks (in Github issues) that would be reasonably self-contained and could be grabbed by potential contributors. What do you think?

There are some obvious choices (multipart messages, which are already being worked on, more socket types), however I think that migrating to std::future and async/await is the first step, because the more stuff gets added here, the more annoying will be to switch in the future (pun intended).

If you agree, I can start a PR attempting to migrate to std::future. async/await is unstable yet, but I would still suggest to use it where applicable (at least in examples), it should be stable in several months and I guess that this library won't be fully stable with all the functionality sooner than that.

cetra3 commented 5 years ago

I think it's a great idea to target async/await! It'd have to sit on a branch until everything's in stable and out of alpha like tokio-0.2, but nothing is stopping it from starting now.

In terms of what is needed:

Multipart is working fine on the multipart_support, but uses a Vec per message which I don't know if it's the most efficient way. Need a few benchmarks to work this out and compare against current master.

Kobzol commented 5 years ago

I agree that the Vec of Messages should be benchmarked. FWIW tokio-zmq uses VecDeque<zmq::Message> and I didn't see it appear anywhere up top in a profiler in my distributed app that sends tens of thousands messages per second. I'll try to sketch a PR with update to std::future and async/await.

cetra3 commented 5 years ago

Thanks! If you don't have time to get round to it, I am planning to implement it at some point in the future, but will wait until the ecosystem catches up (mainly actix-web and tokio).

Multipart can easily swap out to VecDeque if this is faster, but I don't think it would make much difference in the current implementation, considering I just push and pop to the end of the Vec when receiving.

Kobzol commented 5 years ago

Multipart can easily swap out to VecDeque if this is faster, but I don't think it would make much difference in the current implementation, considering I just push and pop to the end of the Vec when receiving.

Yes, I meant that Vec is basically equivalent to VecDeque and at least in my experiences VecDeque performed fine :) I suspect that VecDeque was used to allow popping quickly from both sides, but I think that Vec is fine.

tokio is now very close to finishing a monumental update to 0.2, I don't expect to see large changes between now and the release of that version. I'll try to work on it this weekend.

Kobzol commented 5 years ago

I wrote the minimal amount of code necessary to get a read (PULL), write (PUSH) and read/write (DEALER) socket working for some basic test cases with std::future. I have written it directly with multipart messages in mind, since otherwise the implementation would be a lot different (it's still only about 100 lines, so I hope that we can keep this core simple with the new futures).

I created some small testing infrastructure for receiving and sending messages and checking that they have correctly arrived. To my surprise all the tests are passing, even the hammer tests, so either I made a mistake somewhere or it's really trivial to implement async I/O with the new futures :) I'm not completely sure about the semantics of PollEvented::poll_read_ready and poll_write_ready though, they only have to be called if your read/write doesn't succeed and returns EAGAIN/EWOULDBLOCK, right? Which would make sense, since Tokio is edge-triggered AFAIK.

Before we start adding more sockets (which should be relatively trivial), I think that some infrastructure for writing the sockets should be created first. It's painfully obvious that most of the sockets are basically a copy-pasted version of a different socket, just with a different name and sometimes a few additional methods (like subscribe). There are two things that should be automated: 1) Building the socket (i.e. generating the code for configuring a socket) Since there is no high-level Rust ZMQ API, the builders in this crate try to create a safer API for building a socket (tokio_zmq does the same). I like it, however there is a problem with it - changing the properties of the ZMQ socket after it has been built is currently impossible. Now with the new futures, the async Socket type can be usually used with a &mut reference and you don't have to move it to actually use the async functions, therefore it might be easier to add functions that change the ZMQ socket properties. And since you might need to change the ZMQ properties dynamically in your program, it would be nice if we offered a way to do this in the API.

2) Implementing Stream/Sink for a socket (for most sockets this should again be just a copy-paste process, I have extracted most of the code into a separate struct called EventedSocket, but there is still some boilerplate left to write for each socket).

There are two obvious candidates for generating the code, either macros or procedural macros. Procedural macros would look really nice, tokio_zmq also uses them, but I'm not sure if it's worth the complexity of an additional sub-crate, since the macros will be probably rather trivial.

You can find my version here: https://github.com/Kobzol/tmq/commits/future, please let me know what you think :-)

cetra3 commented 5 years ago

Both poll_read_ready and poll_write_ready need to be called to setup the notifier on the current task in case there is a NotReady returned. If you don't call it, then your task may never wake up as the notifier hasn't been registered. So my understanding is you should always call those methods when performing a read/write.

I'd say macros are a good idea for consistencies sake as well, and makes it trivial to implement socket analogues. I've had.. varying.. success at using them though, and was always planning to refactor into macros at some future date.

If you have a look at the Pull/Push code in the multipart branch, it basically returns a Pub/Sub at the end when building is finished, but there are some methods that can be implemented via macro as they are basically the same.

Sockets like Request/Respond are a little harder to template out and have some weird ordering semantics, as you have to send/receive in lockstep, but most other ones are all the same implementations in terms of the Stream/Sink.

I feel like using declarative macros might be a good start, and only moving to procedural macros if needed, as it is a lot more boilerplate and does slow down compilation time slightly.

I don't see there being an issue with providing a mutable reference to the underlying socket, probably as a trait implemented for each final struct. Maybe implementing DerefMut for the socket structs (i.e, impl DerefMut for Sub returns the socket ref) is all that's needed here. I do feel like there may be some borrow checker problems as tokio expects to own everything, but not sure whether pinning in async/await resolves it.

All in all, off to a great start!

Kobzol commented 5 years ago

Sorry, I meant clear_write_ready and clear_read_ready. I'll try to prepare some builder macros and check if Deref or other things will cause problems around await.

Btw I also replaced failure::Error with a concrete Error type. I don't like that failure::Error doesn't work with std::Error, which makes it annoying to compose. Also failure::Error is a bit heavyweight, since it generates a backtrace on every error creation AFAIK. A concrete Error type should be more performant and has better error semantics (you can easily distinguish different Error variants).

cetra3 commented 5 years ago

Yep, I agree, removing the dependency for failure is definitely a good step.

You need to always call both functions if the error is EAGAIN otherwise I don't think tokio will receive polls from mio/epoll/kqueue:

When the operation is attempted and is unable to succeed due to the I/O resource not being ready, the caller must call clear_read_ready or clear_write_ready. This clears the readiness state until a new readiness event is received.

Kobzol commented 5 years ago

Yes, that's how I also understood it. I just wanted to make sure that I don't have to call them if the read/write was actually successful (or if there was at least one successful read followed by EAGAIN).

cetra3 commented 5 years ago

If you get EAGAIN you will need to call it basically, otherwise the task won't ever wake up again. If you're looping through reads for multipart, you won't get EAGAIN (according to zeromq docs) if the first read is successful.

cetra3 commented 4 years ago

Solved, as by https://github.com/cetra3/tmq/pull/5!