zeromq / zmq.rs

A native implementation of ØMQ in Rust
https://crates.io/crates/zeromq
MIT License
1.13k stars 94 forks source link

feat: add support for reverse binding between PUB and SUB #149

Closed dimitar-petrov closed 2 years ago

dimitar-petrov commented 2 years ago

In order to provide functionality required by following example #147 a patch to PUB and SUB sockets should be implemented.

Quote from dev on discord:

subscriber socket should keep list of his subscriptions and send it to PUB socket when it connects

Todos

Any comments regarding possible solutions are appreciated. Thanks.

Alexei-Kornienko commented 2 years ago

Basically we need to implement 2 things: 1) https://github.com/zeromq/zmq.rs/blob/master/src/sub.rs#L23 Here we should add a HashSet to keep subscriptions and add/remove from it with each subscribe unsubscribe call (https://github.com/zeromq/zmq.rs/blob/master/src/sub.rs#L33)

2) this part is more complicated. I Have a struct called backend to encapsulate most of the socket logic related to network. Backend provides call on_peer_connected which is called when a new connection is established and handshake completes (https://github.com/zeromq/zmq.rs/blob/master/src/backend.rs#L101). SUB now uses GenericSocketBackend We need to replace it with a custom backend for SUB socket that will use HashSet created on step 1 to send current subscriptions to PUB socket when it first connects. This can be done by either copy/pasting GenericSocketBackend impl to sub socket and customizing it or by using composition of structs

dimitar-petrov commented 2 years ago

Couple of questions:

  1. How to access SubSocket.subs (HashSet) from MultiPeerBackend.peer_connected, since it is Composite?
  2. What about putting the HashSet as property of SubSocketBackend struct and acessing it as self.backend.subs from subscribe and unsubscribe?
Alexei-Kornienko commented 2 years ago
  1. Yeah you might be right. I guess in such case it might be easier to put it in backend itself. would need to use a mutex for it but it should not be a problem cause it's not on a fast path of the code.

Another issue I see now is that peer_connected is not async. It means that you want be able to send messages to pub socket and await them. I see 2 possible options:

Alexei-Kornienko commented 2 years ago

Mostly LGTM. I would suggest minor refactoring + maybe run a fmt/clippy to make sure that code looks pretty

Alexei-Kornienko commented 2 years ago

Overall LGTM. Could you please fix formatting issues reported by CI so we can get it merged?

dimitar-petrov commented 2 years ago

Sure I will take care of formatting and push:

I have two questions in the mean time:

  1. Why this snippet is not working. For some reason it is not unlocking the mutex and compiler complains due to awaiting latter in the function.
    let subs = self.subs.lock();
    let subs_msgs: Vec<ZmqMessage> = subs.iter().map(
      |x| SubSocketBackend::create_subs_message(
      &x, SubBackendMsgType::SUBSCRIBE)).collect();
    drop(subs);
  2. And I noticed that if I disconnect and reconnect the sub server process, a client with the following code does not detect anytihg and is trying to push messages.

    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
    println!("Start client");
    let mut socket = zeromq::PubSocket::new();
    socket.connect("tcp://127.0.0.1:30001").await.expect("Failed to connect");
    sleep(Duration::from_millis(100)).await;
    
    let mut msg = zeromq::ZmqMessage::from("test");
    msg.prepend(&"topic".into());
    
    loop {
    println!("Send ...");
    println!("{:?}", socket.send(msg.clone()).await.unwrap());
    sleep(Duration::from_millis(1000)).await
    }
    
    Ok(())
    }
Alexei-Kornienko commented 2 years ago
  1. Seems like compiler issue. Following code does work:
    
        let subs_msgs = {
            let subs = self.subs.lock();
            let subs_msgs: Vec<ZmqMessage> = subs.iter().map(
                  |x| SubSocketBackend::create_subs_message(
                      &x, SubBackendMsgType::SUBSCRIBE)).collect();
            drop(subs);
            subs_msgs
        };

So in this case I would say that compiler rejects correct code.

2. That's another open issue I have in the library (#143 ). Code currently doesn't handle reconnect on it's own as it should according to ZMQ spec. However IMHO it should be fixed via separate MR. 
dimitar-petrov commented 2 years ago

@Alexei-Kornienko, let me know if you are fine with the function/variable naming conventions I used.

Alexei-Kornienko commented 2 years ago

LGTM.