nats-io / nats.java

Java client for NATS
Apache License 2.0
567 stars 154 forks source link

Timeout on parallel subscribe two consumers #996

Closed philmu closed 2 months ago

philmu commented 1 year ago

Observed behavior

When parallel subscribe in different threads two consumer on local idle nats, i received sometimes an IOException:Timeout after two seconds.

Expected behavior

No timeout

Server and client version

Server: 2.10.1 Client: jnats 2.16.14

Host environment

Xubuntu 22.04.3 6.2.0-33-generic x86_64

Steps to reproduce

After a few iteration with my testcode:

Iteration: 0
Stream [1] established.
Stream [0] established.
Iteration: 1
Stream [0] established.
Stream [1] established.
Iteration: 2
Stream [0] established.
Stream [1] established.
Iteration: 3
Stream [1] established.
Stream [0] not established.
java.io.IOException: Timeout or no response waiting for NATS JetStream server
    at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:216)
    at io.nats.client.impl.NatsJetStreamImpl.makeRequestResponseRequired(NatsJetStreamImpl.java:200)
    at io.nats.client.impl.NatsJetStreamImpl._getStreamNames(NatsJetStreamImpl.java:151)
    at io.nats.client.impl.NatsJetStreamImpl.lookupStreamBySubject(NatsJetStreamImpl.java:191)
    at io.nats.client.impl.NatsJetStream.createSubscription(NatsJetStream.java:301)
    at io.nats.client.impl.NatsJetStream.subscribe(NatsJetStream.java:563)
    at NatsTimeoutOnParallelSubscribe.lambda$0(NatsTimeoutOnParallelSubscribe.java:54)
    at java.base/java.lang.Thread.run(Thread.java:833)
Error on subscribe

After some debugging the problem is an timout in https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1098

I think in the block https://github.com/nats-io/nats.java/blob/2.16.14/src/main/java/io/nats/client/impl/NatsConnection.java#L1161 seems to be exists an multithreading-problem

Pragmatic way: After synchronize the part, the timeout-xpt is gone:

synchronized (this){
        if (inboxDispatcher.get() == null) {
            NatsDispatcher d = new NatsDispatcher(this, this::deliverReply);

            if (inboxDispatcher.compareAndSet(null, d)) {
                String id = this.nuid.next();
                this.dispatchers.put(id, d);
                d.start(id);
                d.subscribe(this.mainInbox);
            }
        }
     }

Testcode:

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;

public class NatsTimeoutOnParallelSubscribe {

    private final static String NATS_SERVER = "nats://localhost:4222";
    private final static String NATS_TOKEN = "KxX1FqaJ";

    private final static String STREAM_NAME = "NatsTimeout";
    private final static String STREAM_SUBJECT = "test";

    public static void main(String[] args) throws IOException, JetStreamApiException, InterruptedException {
    NatsTimeoutOnParallelSubscribe nt = new NatsTimeoutOnParallelSubscribe();
    nt.createStream();
    int i = 0;
    while (true) {
        System.out.printf("Iteration: %d\n", i++);
        nt.parallelSubscribe();
        Thread.sleep(1000);
    }
    }

    private void parallelSubscribe() throws IOException, JetStreamApiException, InterruptedException {
    final Connection nastCon = getConnection();
    final JetStream jetStream = nastCon.jetStream();
    final int streams = 2;
    CountDownLatch cdl = new CountDownLatch(streams);

    for (int i = 0; i < streams; i++) {
        final int c = i;
        new Thread(() -> {
        try {
            jetStream.subscribe(STREAM_SUBJECT);
            cdl.countDown();
            System.out.printf("Stream [%d] established.\n", c);
        } catch (Exception e) {
                    System.err.printf("Stream [%d] not established.\n", c);
            e.printStackTrace();
        }
        }).start();
    }
    if (!cdl.await(3, TimeUnit.SECONDS)) {
        System.err.printf("Error on subscribe\n");
        System.exit(1);
    }

    nastCon.close();
    }

    private void createStream() throws IOException, JetStreamApiException, InterruptedException {
    final Connection nastCon = getConnection();
    JetStreamManagement jsm = nastCon.jetStreamManagement();
    List<StreamInfo> currentStreams = jsm.getStreams();

    Optional<StreamInfo> oldStream = currentStreams.stream()
        .filter(si -> si.getConfiguration().getName().equals(STREAM_NAME)).findFirst();

    if (oldStream.isPresent()) {
        System.out.printf("use existing stream\n");
    } else {
        StreamInfo info = jsm.addStream(StreamConfiguration.builder().name(STREAM_NAME)
            .storageType(StorageType.Memory).subjects(STREAM_SUBJECT).description("Test Stream").build());
        System.out.printf("stream created: %s\n", info);
    }

    nastCon.close();
    }

    private Connection getConnection() {

    io.nats.client.Options.Builder builder = new Options.Builder().server(NATS_SERVER)
        .token(NATS_TOKEN.toCharArray());
    try {
        return Nats.connect(builder.build());
    } catch (IllegalStateException | IOException | InterruptedException e) {
        throw new RuntimeException(e);
    }
    }
}
scottf commented 1 year ago

Is this repeatable against a local, non-cluster server? This seems unusual, the subscribe request reply should take milliseconds. not accounting for things like network latency, server load.

philmu commented 1 year ago

Yes, this issue comes from my local (non-cluster) machine. I think this is not a server side problem, but rather a multithreading problem on clientside. With the synchronize block the issue is gone.

scottf commented 1 year ago

I'm very concerned about putting a synchronize block around that code since it does not explain why there is a timeout, even if the problem appears to go away when you sync it. If there is an issue, I think there is another reason to be figured out. Will have to look into this.

scottf commented 3 months ago

@philmu I cannot reproduce this. I've been running your example for about a half hour. Is this still occurring? I wonder if this could be related to the operating system the program is running on, I'm running on windows.

philmu commented 2 months ago

@scottf I have tested again and since 2.17.5 the problem is gone as somebody implemented (nearly: synchronize vs reentrantlock) my suggested solution: https://github.com/nats-io/nats.java/compare/2.17.4...2.17.5#diff-bbcf614ee7ee4ccd15b22d1f95fa40e4e642098386b6d1d14127dab5968d89feR1191

The problem is solved and you can close the issue.