Closed skrap closed 4 years ago
More options:
SinkItem
to some collection-of-messages type. tokio-zmq uses a VecDeque<S: zmq::Sendable>
, which seems usable, though I'm not sure what the advantages of that over a Vec<>
are.zmq::Message
in an outer type (tmq::Message
could be used for this) which includes a has_more: bool
, and possibly other flags as well. Switch to using that as the SinkItem
type.I think that is a good idea!
From the look of the library, the send_multipart
doesn't use the Sendable
trait for the type bounds, I'm wondering if that's because HKT or similar prevents it. I'd say at a minimum we would use the same types, i.e, IntoIterator<Item = Into<Message>>
for sending. For receiving, I'm not too sure, but I would say maybe a Vec<Message>
would be a good start. Not sure why a VecDeque
is used, since I'm assuming the number of multipart messages is normally pretty small? Would need to benchmark!
For subscribe, I'd imagine something like this could work:
let request = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")
.expect("Couldn't connect")
.subscribe_multipart("")
.for_each(|val| {
info!("Got {} messages", val.len())
Ok(())
})
.map_err(|e| {
error!("Error Subscribing: {}", e);
});
The subscribe_multipart("")
would need to return a new struct type SubMultipart
, which would implement the Stream
trait but the Item
would be equal to Vec<zmq::Message>
. This may mean duplicate structs for each socket, but multipart style.
The Poller
trait will need two new methods:
fn send_multipart_message<I: IntoIterator<Item = M>, M: Into<zmq::Message>>(&self, msg: I) -> Poll<(), Error>;
fn recv_multipart_message(&self, msg: &mut Vec<zmq::Message>) -> Poll<(), Error>;
Not sure if sockets mix and match multipart messages with single ones? I would imagine if you're in "multipart mode" then a single message would be a length 1 vec.
I'm playing around with some of the options to see how the ergonomics work out. Having a multipart SinkItem (probably Vec
Definitely needs to be tested. I wonder whether the high water mark comes into play here: http://api.zeromq.org/2-1:zmq-setsockopt#toc3
I was experimenting with this API, trying to make a single Sink which could accept either a multipart message or a single zmq::Message
(aka "frame", or "message part"), but it seems we may need to wait until futures-rs 3 for this: https://github.com/rust-lang-nursery/futures-rs/pull/1481
For right now I'm focusing more on changing the unit of sending and receiving to a Vec<zmq::Message>
. This would cause one extra allocation per sent multipart message, but it unblocks progress on multipart sending. Comments definitely welcome!
I would assume there'd need to be different Sink
structs as there would be different Stream
structs. Duplication is unavoidable, but I have a feeling that macros may help here.
For the SinkItem
have you tried with IntoIterator<Item = Into<Message>>
? Otherwise I think Vec<Message>
is fine as a first shot!
I did try to use IntoIterator. The IntoIterator associated type requirements made it sort of a moot point — rustc wanted a fully defined IntoIterator trait, like type SinkItem = IntoIterator<Item=Into
Stream seems likely to always produce the same type, but the intention with Sink (in futures 3.x) seems to be for it to be able to accept multiple types, rather than having a fully-realized associated type defined.
I’m gonna keep poking at this.
On Apr 15, 2019, at 7:20 PM, cetra3 notifications@github.com wrote:
I would assume there'd need to be different Sink structs as there would be different Stream structs. Duplication is unavoidable, but I have a feeling that macros may help here.
For the SinkItem have you tried with IntoIterator<Item = Into
> ? Otherwise I think Vec is fine as a first shot! — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/cetra3/tmq/issues/2#issuecomment-483453781, or mute the thread https://github.com/notifications/unsubscribe-auth/ABFImvQQirbTYydzzTFGLdGncthIncmPks5vhQk0gaJpZM4cih0N.
One challenge I can see with using the same Sink
for both multipart and normal is what function to call from the zmq library, since it's a different function depending on whether it's multipart or not, so having it generic here may be a bit hard to wrestle types.
If the IntoIterator
doesn't work I think sticking with Vec<Message>
for now to keep things simple would be appropriate.
I've got an initial version the multipart_support
branch, but I think I'm going to refactor it quite substantially before cutting a new version: https://github.com/cetra3/tmq/tree/multipart_support
I was thinking about this a bit a few days ago, actually! One of zmq's guarantees is that if a single frame of a message is received, that all parts will be received. This means that all parts are already in buffered in memory at the time that the first part is emitted by zmq. Based on my understanding of common usage of zmq, it seems like there's no practical efficiency gain to receiving single frames of a multipart message. To me, this points to an API that deals only with MultipartMessage objects, instead of single frames. I've not taken a look at your changes yet, but if you're poking at this stuff I wanted to give my thoughts.
I have started working on a TmqMessage
type which at the moment is just an enum for a Single message or a Multipart message, with the aim to keep the ergonomics as clean as the current implementation, but it's managed to make it more messy.
It works, but I don't like the way it is used from a consumer perspective, so I'm possibly going to rework it further.
Solved, as by https://github.com/cetra3/tmq/pull/5!
ZMQ supports multipart messages, which have the useful property that if some part of the multipart message is received, then it's guaranteed that all of the parts are received. This lets zmq-based protocols use multipart messages for easy segmentation of the messages.
For example, it's common for pub/sub messages to be sent as multipart messages, with the topic sent as the first part (and subject to publisher-side filtering) and a payload sent as the second part of the multipart message.
While it's possible to support incoming multipart messages with this crate (via accumulation of incoming messages into a collection, and testing for multipart completion via
get_rcvmore
) I don't think it's possible to actually send multipart messages currently. We should support this!The underlying ZMQ crate supports these messages via the
send_multipart
function, or via manual injection of thezmq::SNDMORE
into the flags argument of thezmq::send
function for all but the last of the multipart messages.This issue is intended to be a discussion of how to best support the sending of multipart messages.
Here are some options I see:
Teach
Poller
tosend_multipart
Teach
Poller
tosend_message_with_flags
There's probably other options out there! Just wanted to start the discussion. If this isn't the best place for this type of work, please let me know and I'll move this elsewhere.