nats-io / stan.java

NATS Streaming Java Client
https://nats.io
Apache License 2.0
93 stars 36 forks source link

NATS streaming does not reconnect after NATS-Streaming server restart. #118

Closed igoberman closed 5 years ago

igoberman commented 5 years ago

This may have been reported before, but I am having hard time reconnecting to the NATS server after I restart it. This is a very frustrating issue. You would think this should be first thing that should be tested, but apparently it does not seem to work. I have tried all sorts of configurations without luck.

My publisher and subscriber are below. Start publisher and subscriber. Everything is fine. Then bounce NATS-streaming.

You will then see these messages on publisher below. NATS reconnected, but publishing does not work anymore. Further, even if I restart publisher, subscriber is not receiving any messages. 2019-02-08 15:57:57,542 INFO [test-pub Dispatcher] ITCCollector | Ack message 'TzwGsEcmy7hOv0Siw2GQMS' 2019-02-08 15:58:02,543 INFO [test-pub Dispatcher] ITCCollector | Trying published message TzwGsEcmy7hOv0Siw2GQQi test3 on 'test' 2019-02-08 15:58:02,551 INFO [test-pub Dispatcher] ITCCollector | Ack message 'TzwGsEcmy7hOv0Siw2GQQi' 2019-02-08 15:58:03,766 INFO [pool-2-thread-1] ITCCollector | NATS: null connection event - nats: connection disconnected 2019-02-08 15:58:03,767 INFO [pool-2-thread-1] ITCCollector | NATS: null connection event - nats: connection disconnected 2019-02-08 15:58:05,775 INFO [pool-2-thread-1] ITCCollector | NATS: null connection event - nats: connection disconnected 2019-02-08 15:58:07,553 INFO [test-pub Dispatcher] ITCCollector | Trying published message TzwGsEcmy7hOv0Siw2GQUy test4 on 'test' 2019-02-08 15:58:07,776 INFO [pool-2-thread-1] ITCCollector | NATS: null connection event - nats: connection disconnected 2019-02-08 15:58:09,778 INFO [pool-2-thread-1] ITCCollector | NATS: null connection event - nats: connection disconnected 2019-02-08 15:58:11,781 INFO [pool-2-thread-1] ITCCollector | NATS: nats://localhost:4223 connection event - nats: connection reconnected 2019-02-08 15:58:11,825 INFO [pool-2-thread-1] ITCCollector | NATS: nats://localhost:4223 connection event - nats: subscriptions re-established 2019-02-08 15:58:12,554 ERROR [jnats-streaming ack timeout thread] ITCCollector | Error publishing msg id TzwGsEcmy7hOv0Siw2GQUy with error stan: publish ack timeout on 'test' 2019-02-08 15:58:12,555 INFO [jnats-streaming ack timeout thread] ITCCollector | Trying published message TzwGsEcmy7hOv0Siw2GQZE test4 on 'test'

public class NatsStreamingPublisherTester { private static final Logger logger = LoggerFactory.getLogger(ITCCollector.class); public static final String NATS_URL = "nats://localhost:4223"; public static final String NATS_STREAMING_CLUSTER = "nats-streaming-cluster"; public static final String SUBJECT = "test"; private static StreamingConnection natsConnection; private static Options natsOptions = new Options.Builder() .natsUrl(NATS_URL) .pubAckWait(Duration.ofSeconds(5L)) .connectionListener((conn, events) -> { logger.info("NATS: {} connection event - {}", conn.getConnectedUrl(), events); }) .build();

static class MyAckHandler implements io.nats.streaming.AckHandler {

    @Override
    public void onAck(String guid, Exception err) {
        if (err != null) {
            logger.error("Error publishing msg id {} with error {} on '{}'", guid, err.getMessage(), SUBJECT);
        } else {
            msgIdx++;
            logger.info("Ack message '{}'", guid);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            }
        }
        sendMessage();
    }
}

static MyAckHandler ackHandler = new MyAckHandler();
static int msgIdx;

private static void sendMessage() {
    try {
        String s = "test" + msgIdx;
        String guid = natsConnection.publish(SUBJECT, s.getBytes(), ackHandler);

        logger.info("Trying published message {} {} on '{}'", guid, s, SUBJECT);
    } catch (Throwable e) {
        e.printStackTrace();
    }
}

public static void main(String args[]) {
    buildNatsConnection();
}

private static void buildNatsConnection() {
    while (true) {
        try {
            logger.info("Connecting to NATS streaming cluster {} on url: {}.", NATS_STREAMING_CLUSTER, NATS_URL);
            natsConnection = NatsStreaming.connect(NATS_STREAMING_CLUSTER, "test-pub", natsOptions);
            logger.info("Connected to NATS streaming.");
            sendMessage();
            break;
        } catch (Throwable e) {
            logger.error("Unable to connect to Nats streaming cluster {} on url {} with error: {}"
                    , NATS_STREAMING_CLUSTER, NATS_URL, e.getMessage());
            try { Thread.sleep(5000); } catch (InterruptedException e1) { break; }
        }
    }
}

}

public class NatsStreamingSubTester { private static final Logger logger = LoggerFactory.getLogger(ITCCollector.class); public static final String NATS_URL = "nats://localhost:4223"; public static final String NATS_STREAMING_CLUSTER = "nats-streaming-cluster"; public static final String SUBJECT = "test"; private static StreamingConnection natsConnection;

private static Options natsOptions = new Options.Builder()
        .natsUrl(NATS_URL)
        .pubAckWait(Duration.ofSeconds(5L))
        .connectionListener((conn, events) -> {
            logger.info("NATS: {} connection event - {}", conn.getConnectedUrl(), events);
        })
        .build();

public static void main(String args[]) throws InterruptedException, TimeoutException, IOException {

    buildNatsConnection();

    Subscription sub = natsConnection.subscribe(SUBJECT, (m) ->
            System.out.printf("Received a message: %s\n", new String(m.getData()))
    , new SubscriptionOptions.Builder().deliverAllAvailable().build());
}

private static void buildNatsConnection() {
    while (true) {
        try {
            logger.info("Connecting to NATS streaming cluster {} on url: {}.", NATS_STREAMING_CLUSTER, NATS_URL);
            natsConnection = NatsStreaming.connect(NATS_STREAMING_CLUSTER, "test-sub", natsOptions);
            break;
        } catch (Throwable e) {
            logger.error("Unable to connect to Nats streaming cluster {} on url {} with error: {}"
                    , NATS_STREAMING_CLUSTER, NATS_URL, e.getMessage());
            try { Thread.sleep(5000); } catch (InterruptedException e1) { break; }
        }
    }
}

}

igoberman commented 5 years ago

I found a workaround. Thanks.

znatashka commented 4 years ago

I found a workaround. Thanks.

hi! we have the same problem... could you share your solution, please?

fxnn commented 3 years ago

@igoberman Can you share the workaround, please?

igoberman commented 3 years ago

I do not remember. We stopped using Nats altogether and switched to Kafka.

fxnn commented 3 years ago

Well, still thank you for the answer 🙂