cetra3 / tmq

Rust ZeroMQ bindings for Tokio
151 stars 28 forks source link

PUB/SUB don't work together #40

Closed bemyak closed 1 year ago

bemyak commented 1 year ago

Hi,

I've combined the publish and subscribe examples into one and it doesn't work, the subscriber just blocks indefinitely.

Could you please help me investigate what's wrong?

use std::error::Error;

use futures_util::{SinkExt, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let topic: &[u8] = b"topic";
    let addr = "tcp://127.0.0.1:5003";

    let mut p = tmq::publish(&tmq::Context::new()).bind(addr)?;
    let mut s = tmq::subscribe(&tmq::Context::new())
        .connect(addr)?
        .subscribe(topic)?;

    p.send(vec![topic, "test message".as_bytes()]).await?;
    let msg = s.next().await.unwrap()?;
    println!("{:?}", msg);
    Ok(())
}
bemyak commented 1 year ago

NVW, this is a ØMQ's quirk. What happens is the message is sent to PUB, but since the subscriber is not actively listening, the message is discarded.

The fix is to run the publisher in a separate task with a small initial delay.