erickt / rust-zmq

Rust zeromq bindings.
Apache License 2.0
886 stars 189 forks source link

ipc with XPUB and XSUB not working #389

Closed g-tejas closed 6 months ago

g-tejas commented 6 months ago

I have a proxy that looks like this

fn main() {
    let context = zmq::Context::new();
    let mut frontend = context.socket(zmq::XSUB).unwrap();
    let mut backend = context.socket(zmq::XPUB).unwrap();
    let mut capture = context.socket(zmq::PUB).unwrap();

    let frontend_endpoint = "ipc://frontend";
    let backend_endpoint = "ipc://backend";
    let capture_endpoint = "tcp://127.0.0.1:5555";

    frontend
        .connect(frontend_endpoint)
        .expect("failed connecting frontend");
    backend
        .bind(backend_endpoint)
        .expect("failed binding backend");
    capture
        .bind(capture_endpoint)
        .expect("failed binding capture");

    println!("Starting proxy listening on {}, forwarding to {} and capturing on {}", frontend_endpoint, backend_endpoint, capture_endpoint);
    zmq::proxy_with_capture(&mut frontend, &mut backend, &mut capture).expect("failed proxying");
}

A publisher that looks like this

fn main() {
    println!("Hello, world!");

    let context = zmq::Context::new();
    let publisher = context.socket(zmq::PUB).unwrap();

    let endpoint = "ipc://frontend";
    publisher.bind(endpoint).expect("failed to connect");

    println!("Publishing to {}", endpoint);

    let mut count = 0;

    loop {
        publisher
            .send("BYBIT::BTC/USDT", zmq::SNDMORE)
            .expect("failed sending first envelope");
        publisher
            .send(&format!("{}", count), 0)
            .expect("failed sending first message");
        publisher
            .send("BYBIT::ETH/USDT", zmq::SNDMORE)
            .expect("failed sending second envelope");
        publisher
            .send(&format!("{}", count + 1), 0)
            .expect("failed sending second message");
        count += 2;
        sleep(Duration::from_secs(1));
    }
}

And a subscriber that looks like this

fn main() {
    let context = zmq::Context::new();
    let subscriber = context.socket(zmq::SUB).unwrap();

    let endpoint = "ipc://backend";
    subscriber
        .connect(endpoint)
        .expect("failed connecting subscriber");

    let curr_sub = b"";
    subscriber.set_subscribe(curr_sub).expect("failed subscribing");

    println!("Listening to messages from {}:{:?}", endpoint, curr_sub);

    loop {
        let envelope = subscriber
            .recv_string(0)
            .expect("failed receiving envelope")
            .unwrap();
        let message = subscriber
            .recv_string(0)
            .expect("failed receiving message")
            .unwrap();
        println!("[{}] {}", envelope, message);
    }
}

But for some reason this doesn't work. I don't see any traffic on the subscriber side. If i change the urls to tcp instead of ipc/inproc, it works. Not sure what im doing wrong, is ipc not supported for xsub/xpub ? Since i see examples that work with pub and sub so.

Edit: i did see that tcp is a disconnected transport, but so is IPC

g-tejas commented 6 months ago

RESOLVED: I just had to run it from the same directory 😭