mnetship / ratsio

NATS & NATS Streaming Server client library for Rust
MIT License
50 stars 22 forks source link

Stan client not re-subscribing after disconnect/reconnect #10

Open glueball opened 4 years ago

glueball commented 4 years ago

To reproduce the issue, I've prepared the following example (largely based on the code in the README). Code bellow.

Note: I'm using ratsio "^0.2" from crates.io.

Steps

  1. Start the Stan server using docker (docker run -it -p 4222:4222 -p 8222:8222 nats-streaming).
  2. Run the test program showing all debug items (RUST_LOG=debug cargo run). Full output below.
  3. The first time, the subscription works, as can be seen in the following debug line:

    PUB _STAN.sub.Q5Oy65iZCkgsOufXAGiu1D    G5dnLGhLmDpGOT1Qs87bF9  85
    
    test-clienttest.subject"_SUB.AtHogtvDsQpHDKgsV9pSUy(0:test.subject.durableP
  4. Then kill the stan server (control-c). I wait until the Pings are not responded to, we may be down. line, but doesn't seem to make a difference.
  5. Re-start the stan server with docker.
  6. There is no new _STAN.sub.

I've noted There's a line that says 1 STAN Reconnecting Subscriptions [1] (which is correct), but the line below it is different (_SUB. vs _STAN.sub).

In this run, I did not use a second program to send messages in order to have less output, but doing so shows that the client does receive messages before the disconnection/reconnection, but not after it.

Test code

use ratsio::nats_client::NatsClientOptions;
use ratsio::stan_client::{StanClient, StanMessage, StanOptions, StanSubscribe, StartPosition, SyncHandler};
use tokio::prelude::Future;

fn main() {
    env_logger::init();

    let nats_options = NatsClientOptions::builder()
        .cluster_uris(vec!("192.168.99.100:4222".to_string()))
        .reconnect_timeout(5u64)
        .build()
        .unwrap();

    let stan_options = StanOptions::builder()
        .nats_options(nats_options)
        .cluster_id("test-cluster")
        .client_id("test-client").build()
        .unwrap();

    let subject: String = "test.subject".into();

    let program = StanClient::from_options(stan_options)
        .and_then(move |stan_client| {

            let sub = StanSubscribe::builder()
                .subject(subject.clone())
                .start_position(StartPosition::First)
                .durable_name(Some("test.subject.durable".into()))
                .manual_acks(false)
                .ack_wait_in_secs(5)
                .build().unwrap();

            stan_client
                .subscribe(sub, SyncHandler(Box::new(move |stan_msg: StanMessage| {
                    let msg = std::str::from_utf8(&stan_msg.payload).unwrap();
                    println!("> GOT :::: {:?} -> {}", stan_msg, msg);

                    Ok(())
                })))
        })
        .map(|_| ())
        .map_err(|_| ())
        ;

    tokio::run(program);
}

Full debug output

[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Connection id => AtHogtvDsQpHDKgsV9pSCj
[2020-01-08T17:01:31Z INFO  ratsio::net::connection]  Resolved 192.168.99.100 to 192.168.99.100:4222
[2020-01-08T17:01:31Z DEBUG tokio_reactor] adding I/O source: 0
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Write for: 0
[2020-01-08T17:01:31Z DEBUG ratsio::net::connection] Got a socket successfully.
[2020-01-08T17:01:31Z DEBUG ratsio::nats_client::client] Creating NATS client, got a connection.
[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Connecting NATS client
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Got NATS client
[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Connecting STAN Client
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    CONNECT {"verbose":true,"pedantic":false,"tls_required":false,"name":"test-client","lang":"rust","version":"0.2.0","protocol":1,"echo":true}

[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Subscibing to STAN Client heartbeats
[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Subscribing to heartbeat => _HB.AtHogtvDsQpHDKgsV9pSIo
[2020-01-08T17:01:31Z DEBUG ratsio::nats_client::client] Subscription for _HB.AtHogtvDsQpHDKgsV9pSIo / tbpALevgjSOHPBPtOhf5le
[2020-01-08T17:01:31Z DEBUG ratsio::stan_client::client] Issuing STAN join request
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    SUB _HB.AtHogtvDsQpHDKgsV9pSIo  tbpALevgjSOHPBPtOhf5le

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    SUB kvSt6mtcqAN7CXyjUTiapH  GQp9Auoqpa3Qee31JH7aKz

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    UNSUB   GQp9Auoqpa3Qee31JH7aKz  1

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => INFO(ServerInfo { server_id: "NBNKVWMOOQENFNACAYJMFPF2W6KK4JNETXS3TWUVST3XYA36AGWZGADR", version: "2.0.4", go: "go1.11.13", host: "0.0.0.0", port: 4222, max_payload: 1048576, proto: 1, client_id: 6, auth_required: false, tls_required: false, tls_verify: false, connect_urls: [] })
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    PUB _STAN.discover.test-cluster kvSt6mtcqAN7CXyjUTiapH  65

    test-client_HB.AtHogtvDsQpHDKgsV9pSIo"AtHogtvDsQpHDKgsV9pSCj

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => MSG(Message { subject: kvSt6mtcqAN7CXyjUTiapH, sid: GQp9Auoqpa3Qee31JH7aKz, reply_to: None })
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:31Z DEBUG ratsio::nats_client::client] Removing sid GQp9Auoqpa3Qee31JH7aKz
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    SUB G5dnLGhLmDpGOT1Qs87bF9  kiqpBfqum4kH32RbJRxIbH

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    UNSUB   kiqpBfqum4kH32RbJRxIbH  1

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    PUB _STAN.sub.Q5Oy65iZCkgsOufXAGiu1D    G5dnLGhLmDpGOT1Qs87bF9  85

    test-clienttest.subject"_SUB.AtHogtvDsQpHDKgsV9pSUy(0:test.subject.durableP

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => MSG(Message { subject: G5dnLGhLmDpGOT1Qs87bF9, sid: kiqpBfqum4kH32RbJRxIbH, reply_to: None })
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:31Z DEBUG ratsio::nats_client::client] Removing sid kiqpBfqum4kH32RbJRxIbH
[2020-01-08T17:01:31Z DEBUG ratsio::nats_client::client] Subscription for _SUB.AtHogtvDsQpHDKgsV9pSUy / urfnDtpBupSn1wjnbvxk26
[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Sending --->
    SUB _SUB.AtHogtvDsQpHDKgsV9pSUy urfnDtpBupSn1wjnbvxk26

[2020-01-08T17:01:31Z DEBUG ratsio::codec]  Op::Item => OK
[2020-01-08T17:01:31Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:36Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:01:36Z DEBUG ratsio::codec]  Op::Item => PONG
[2020-01-08T17:01:36Z DEBUG tokio_reactor::registration] scheduling Read for: 0
[2020-01-08T17:01:36Z DEBUG ratsio::nats_client::client]  Received PONG
[2020-01-08T17:01:41Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:01:46Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:01:46Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:01:51Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:01:51Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:01:56Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:02:01Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:02:01Z ERROR ratsio::nats_client::client] Pings are not responded to, we may be down.
[2020-01-08T17:02:01Z INFO  ratsio::net::connection]  Resolved 192.168.99.100 to 192.168.99.100:4222
[2020-01-08T17:02:01Z DEBUG tokio_reactor] adding I/O source: 0
[2020-01-08T17:02:01Z DEBUG tokio_reactor::registration] scheduling Write for: 0
[2020-01-08T17:02:03Z DEBUG tokio_reactor] dropping I/O source: 0
[2020-01-08T17:02:03Z ERROR ratsio::net::connection] Error reconnecting :: NoRouteToHostError
[2020-01-08T17:02:03Z INFO  ratsio::net::connection]  Resolved 192.168.99.100 to 192.168.99.100:4222
[2020-01-08T17:02:03Z DEBUG tokio_reactor] adding I/O source: 4194304
[2020-01-08T17:02:03Z DEBUG tokio_reactor::registration] scheduling Write for: 0
[2020-01-08T17:02:05Z DEBUG tokio_reactor] dropping I/O source: 0
[2020-01-08T17:02:05Z ERROR ratsio::net::connection] Error reconnecting :: NoRouteToHostError
[2020-01-08T17:02:05Z INFO  ratsio::net::connection]  Resolved 192.168.99.100 to 192.168.99.100:4222
[2020-01-08T17:02:05Z DEBUG tokio_reactor] adding I/O source: 8388608
[2020-01-08T17:02:05Z DEBUG tokio_reactor::registration] scheduling Write for: 0
[2020-01-08T17:02:06Z DEBUG ratsio::net::connection] Got a socket successfully.
[2020-01-08T17:02:06Z DEBUG tokio_reactor] dropping I/O source: 0
[2020-01-08T17:02:06Z DEBUG ratsio::net::connection] Got a connection
[2020-01-08T17:02:06Z DEBUG ratsio::nats_client::client] Closing sink for => "_HB.AtHogtvDsQpHDKgsV9pSIo"
[2020-01-08T17:02:06Z WARN  ratsio::nats_client::client] Closing sink for => tbpALevgjSOHPBPtOhf5le / _HB.AtHogtvDsQpHDKgsV9pSIo
[2020-01-08T17:02:06Z DEBUG ratsio::nats_client::client] Closing sink for => "_SUB.AtHogtvDsQpHDKgsV9pSUy"
[2020-01-08T17:02:06Z WARN  ratsio::nats_client::client] Closing sink for => urfnDtpBupSn1wjnbvxk26 / _SUB.AtHogtvDsQpHDKgsV9pSUy
[2020-01-08T17:02:06Z DEBUG ratsio::stan_client::subscription] done with subscription
[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Op::Item => INFO(ServerInfo { server_id: "NAC5YMXW7ADKZA6QTZLUO5WBJVYMGDXWTO5BLJ3LMGIDT6ICRNBBQ4FG", version: "2.0.4", go: "go1.11.13", host: "0.0.0.0", port: 4222, max_payload: 1048576, proto: 1, client_id: 4, auth_required: false, tls_required: false, tls_verify: false, connect_urls: [] })
[2020-01-08T17:02:06Z INFO  ratsio::stan_client::client]  STAN Reconnecting closing old connection
[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    CONNECT {"verbose":true,"pedantic":false,"tls_required":false,"name":"test-client","lang":"rust","version":"0.2.0","protocol":1,"echo":true}

[2020-01-08T17:02:06Z DEBUG ratsio::stan_client::client] Subscribing to heartbeat => _HB.AtHogtvDsQpHDKgsV9pSb3
[2020-01-08T17:02:06Z DEBUG ratsio::nats_client::client] Subscription for _HB.AtHogtvDsQpHDKgsV9pSb3 / tHFfmvittqBvgiFko65Cpc
[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    SUB t9uuLElencpPkMsb1s8gDs  kl4D4QFvOyvE9mRkMVBAP8

[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    SUB _HB.AtHogtvDsQpHDKgsV9pSb3  tHFfmvittqBvgiFko65Cpc

[2020-01-08T17:02:06Z INFO  ratsio::stan_client::client]  1 STAN Reconnecting Subscriptions [1]

    ["AtHogtvDsQpHDKgsV9pSOt =>  _SUB.AtHogtvDsQpHDKgsV9pSUy"]
[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    SUB Rp7splroM9IsJAGLX2BkCG  BlDRHpm5iOro8Y4gqlNe3X

[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    UNSUB   kl4D4QFvOyvE9mRkMVBAP8  1

[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    PUB _STAN.close.Q5Oy65iZCkgsOufXAGiu1D  t9uuLElencpPkMsb1s8gDs  13

    test-client

[2020-01-08T17:02:06Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:02:11Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:02:11Z DEBUG ratsio::codec]  Sending --->
    PING

[2020-01-08T17:02:16Z DEBUG ratsio::nats_client::client] Skipped a ping.
[2020-01-08T17:02:16Z DEBUG ratsio::codec]  Sending --->
    PING

Process finished with exit code -1
mnetship commented 4 years ago

Thanks for the feedback, I will investigate, currently I am working on a branch to support futures 0.3, but i will fix this issue on this branch as well. ETA for futures 0.3 is Monday

mnetship commented 4 years ago

Follow up on my ETA; apologies I have a hectic work load at the moment, I need another week to stabilize the futures 0.3. new ETA is Jan 20 Kind Regards.

glueball commented 4 years ago

No worries, I completely understand (been there).

By the way, for me is OK if this is fixed in 0.3. So far I'm only testing with the version in crates.io because I'm lazy enough, but plan to use futures 0.3 when I began actually coding my next project, so I'm very interested in your work on that too. Therefore, for me it is Ok if this is fixed in 0.3.

You might want to fix it in 0.2 just in case someone can't upgrade.

avatarneil commented 4 years ago

This seems to be resolved in the tip of master, when I run the example STAN subscriber and restart the server, I seem to be receiving new messages which are published.

cargo run --example stan_subscribe
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
     Running `target/debug/examples/stan_subscribe`
[DEBUG] - Connection id => 6q9JcJmMfRo4mns3h39JZA
[TRACE] - registering with poller
[TRACE] - registering; token=Token(0); interests=Readable | Writable | Error | Hup
[DEBUG] - Connecting STAN Client
[DEBUG] - Subscribing to heartbeat => _HB.6q9JcJmMfRo4mns3h39Je3
[ERROR] - Missed ping interval
[ERROR] - Missed ping interval
[ERROR] - Error pinging NATS server IOError(Os { code: 32, kind: BrokenPipe, message: "Broken pipe" })
[ERROR] - Missed too many pings, reconnect is required.
[TRACE] - registering with poller
[TRACE] - registering; token=Token(1); interests=Readable | Writable | Error | Hup
[TRACE] - deregistering handle with poller
[INFO] - Reconnecting to NATS servers 4 - new version 2
[INFO] - re subscribed to => "_SUB.6q9JcJmMfRo4mns3h39Jiw"
[INFO] - re subscribed to => "_HB.6q9JcJmMfRo4mns3h39Je3"
[DEBUG] - Connection id => 6q9JcJmMfRo4mns3h39Jnp
[DEBUG] - Re-Connecting STAN Client
[DEBUG] - Subscribing to heartbeat => _HB.6q9JcJmMfRo4mns3h39Jsi
[INFO] -  << 1 >> got stan message --- StanMessage { subject: "foo", reply_to: None, payload: [98, 97, 114], timestamp: 1582474425907321200, sequence: 2, redelivered: false, ack_inbox: Some("_INBOX.i2HrFfpqpH4XXYhp8VDmcD"), ack_handler: Some(<ack-handler>) }
        "bar"

I did notice that if I published messages during the period prior to reconnection though, they seemed to never be received by the subscriber.

glueball commented 4 years ago

Yes, it seems to be working now. I tested using ratsio="0.3.0-alpha.1".

I'll test messages sent during re-connection tomorrow (no more time left today), according to @avatarneil's posts. Just try to make sure that the channel is durable and that NATS is configured with a persistent storage (the docker image defaults to memory).

@mnetship, when do you consider ratsio 0.3 (or maybe call it 1.0 already) will be ready for release?