nats-io / nats.java

Java client for NATS
Apache License 2.0
570 stars 154 forks source link

timeout or no response waiting for nats jetstream server #1058

Open liyancoding opened 10 months ago

liyancoding commented 10 months ago

Observed behavior

The NATS cluster is normal. When the NATS CLI tool is used to test the stream status of each node in the cluster, the Java connection reports the error "timeout or no response waiting for NATS jetstream server", which is sometimes recovered but sometimes the fault persists

Expected behavior

NATS is expected to work properly

Server and client version

v2.9.23

Host environment

No response

Steps to reproduce

Streams, themes, and configurations are created or updated when the program is started

scottf commented 10 months ago

What version of the client? What is the connection timeout (default?) What does your network between the client and server look like? What cli commands are you running? Can you give an example of your java code?

liyancoding commented 10 months ago

What version of the client? What is the connection timeout (default?) What does your network between the client and server look like? What cli commands are you running? Can you give an example of your java code?

The client version is 2.15.3.

connection timeout is 2s

cli commands is 'nats str info'、'nats con info'

public static void createConsumerInfo(String streamName, String consumer, String subject) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = connection.jetStreamManagement();
        List<String> consumerNames = jsm.getConsumerNames(streamName);
        if (!consumerNames.contains(consumer)) {
            jsm.addOrUpdateConsumer(streamName, createConsumerConfiguration(consumer, subject));
            log.info("consumer infos: {}", jsm.getConsumers(streamName));
            return;
        }
        ConsumerInfo consumerInfo = jsm.getConsumerInfo(streamName, consumer);
        if (!consumerInfo.getConsumerConfiguration().getFilterSubject().equals(subject)) {
            jsm.deleteConsumer(streamName, consumer);
            jsm.addOrUpdateConsumer(streamName, createConsumerConfiguration(consumer, subject));
            log.info("consumer infos: {}", jsm.getConsumers(streamName));
        }
    }
public static ConsumerConfiguration createConsumerConfiguration(String consumer, String subject) {
        return ConsumerConfiguration.builder()
                .durable(consumer)
                .filterSubject(subject)
                .replayPolicy(ReplayPolicy.Instant)
                .build();
    }
 public static PullSubscribeOptions createPullSubscribeOptions(String consumer, String subject) {
        return PullSubscribeOptions
                .builder()
                .configuration(createConsumerConfiguration(consumer, subject))
                .build();
    }
 public static <T> T subscribeSingleMessage(String stream, String subject, String consumer, Class<T> tClass) {
        try {
            NatsUtils.createConsumerInfo(stream, consumer, subject);
            log.info("url is: '{}'", connection.getConnectedUrl());
            JetStreamSubscription streamSubscription = connection.jetStream().subscribe(subject, createPullSubscribeOptions(consumer, subject));
            List<Message> messageList = streamSubscription.fetch(1, Duration.ofMillis(1000));
            if (CollectionUtils.isEmpty(messageList) || messageList.size() != 1) {
                return null;
            }
            Message message = messageList.get(0);
            message.ack();
            T result = JsonUtils.fromJson(new String(message.getData(), StandardCharsets.UTF_8), tClass);
            return result;
        } catch (IOException e) {
            log.error("failed subscribeSingleMessage due to 'IOException'", e);
            throw new InternalException(HttpStatus.BAD_REQUEST, VtsErrorCode.NATS_SUBSCRIBE_FAILURE, e);
        } catch (JetStreamApiException e) {
            log.error("failed subscribeSingleMessage due to 'JetStreamApiException'", e);
            throw new InternalException(HttpStatus.BAD_REQUEST, VtsErrorCode.NATS_SUBSCRIBE_FAILURE, e);
        }
    }
scottf commented 10 months ago

Can you move to the latest client. 2.15 is pretty old. There could be an issue in there, but I can't remember.

Also which line of code is giving a timeout? The getConsumerNames ?

scottf commented 9 months ago

@liyancoding Are you still able to reproduce this? Is there a main or something that uses the code you provided, I'm trying to reproduce this.

scottf commented 9 months ago

@liyancoding I have an update on this based on another user report.

The other user's code was doing a async publish, and in their error handler that they attached to the future, they tried to do a getConsumerInfo and it timed out. (For later, getConsumerInfo is just a core request reply with an api subject and known json response data)

What I discovered is that the problem with their code was that since they were still inside the future, they could not make core api calls because they were blocking the read-loop of the reader thread, because they were already inside of the read-loop.

I am able to repeat this at will now.

There are 2 workarounds currently. 1) use a different connection since a different connection will have a completely different read-loop. 2) make sure you are outside of the future

Please let me know if this helps. We are in the process of determining if this is possible to address, but I don't want to add yet another thread. It may be a matter of documentation, at least I understand why it's happening.

z0mb1ek commented 7 months ago

i has same trouble when i make send:

java.io.IOException: Timeout or no response waiting for NATS JetStream server
    at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:249)
    at io.nats.client.impl.NatsJetStreamImpl.makeInternalRequestResponseRequired(NatsJetStreamImpl.java:241)
    at io.nats.client.impl.NatsJetStream.publishSyncInternal(NatsJetStream.java:155)
    at io.nats.client.impl.NatsJetStream.publish(NatsJetStream.java:50)

can i make natsConnection().jetStream() as a bean or should call it every time when i need to send message?

scottf commented 7 months ago

jetstream is really just a functional wrapper around the connection and there is no cost except instantiation and initialization

scottf commented 7 months ago

As far as the issues you have with a timeout there are at least 2 possible causes.

  1. The server has disconnected or just too busy (probably not too busy though)
  2. You are making the calls in an async way that one blocks the other. But I'm just speculating. I think I would need to see some more code. Maybe make a gist or put some code in a github repo and link it here.
majdiAlKotamy commented 6 months ago

I'm facing the same problem, still looking for solution.

scottf commented 6 months ago

I'm facing the same problem, still looking for solution.

Can you give some more details or explanation about your specific problem, you environment, your threading? Code examples that reproduce the issue are always helpful. There have been some changes and some options added in the last couple of releases, so they might help your situation.

majdiAlKotamy commented 6 months ago
// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.hello;

import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.JsonUtils;

import java.time.Duration;

/*
JetStream Hello World Example using the NATS client for Java
 */
public class HelloWorld
{
    public static void main( String[] args )
    {
        try (Connection nc = Nats.connect("nats://localhost:4222")) {
            JetStreamManagement jsm = nc.jetStreamManagement();

            // Build the configuration
            StreamConfiguration streamConfig = StreamConfiguration.builder()
                    .name("hello")
                    .subjects("world")
                    .storageType(StorageType.Memory)
                    .build();

            // Create the stream
            StreamInfo streamInfo = jsm.addStream(streamConfig);
            JsonUtils.printFormatted(streamInfo);

            JetStream js = nc.jetStream();
            PublishAck ack = js.publish("world", "one".getBytes());
            JsonUtils.printFormatted(ack);

            ack = js.publish("world", "two".getBytes());
            JsonUtils.printFormatted(ack);

            JetStreamSubscription sub = js.subscribe("world");
            Message m = sub.nextMessage(Duration.ofSeconds(3));
            m.ack();
            System.out.println("Message: " + m.getSubject() + " " + new String(m.getData()));
            JsonUtils.printFormatted(m.metaData());

            m = sub.nextMessage(Duration.ofSeconds(3));
            m.ack();
            System.out.println("Message: " + m.getSubject() + " " + new String(m.getData()));
            JsonUtils.printFormatted(m.metaData());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

my dependency: implementation 'io.nats:jnats:2.17.6'

Nats-server setup and work successfully on docker locally

Screenshot 2024-05-09 111239

Error message:

Screenshot 2024-05-09 111417
scottf commented 6 months ago

This example works running a cluster directly on my machine. I'm almost 100% sure this is a docker networking issue and is a different issue than the original one reported, which had to do with failure during async calls that ended up blocking.

johbar commented 5 months ago

Those using Docker and having a JetStream timeout error must start the container with the -js option, not the default entrypoint. I had a similar problem with the Go client: https://github.com/nats-io/nats.go/issues/1565

scottf commented 3 months ago

Those using Docker and having a JetStream timeout error must start the container with the -js option, not the default entrypoint. I had a similar problem with the Go client: nats-io/nats.go#1565

@z0mb1ek @majdiAlKotamy I was able to recreate the problem against docker as noted by @johbar

When I start the container without -jsI get the timeout:

C:\nats>docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats

When I start the container with -js, it works fine:

C:\nats>docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats -js
scottf commented 3 months ago

@liyancoding Is your issue related to docker or what is your status on this? Please let me know so I could either close the issue in that case or I need to still look into this.

imanolie commented 3 months ago

Hi! I've encountered a weird behaviour. I've a client subscribed to nats server, I'm publishing 100 messages per second - works fine. However, when I am increasing to 120 messages per second, then after the configured timeout (it's 30s in my case) I start receiving TIMEOUT, which stands for client not reading the published message on the topic he subscribed to. I though it might be due to client being busy processing existing request, although the processor is around 55%. Can you please guide me in what should I check or what configuration I have to change in order to be able to send more traffic? Is there a limit either on nats or client with regards to how many messages per second or inflight can handle?

Kind regards

scottf commented 3 months ago

@imanolie can you start a new issue or even a discussion. Also could you please note your server and client versions, an overview of your server setup (clustered or dev), provide some snippets of code, and show your stream configuration. There is no software limit

happybydefault commented 3 months ago

This problem happens to me when I run NATS in Docker with jetstream: enabled in nats-server.conf and I'm using the image tagged as 2.10/2.10-linux/2.10-scratch. The problem goes away when I use the image tagged as 2.10-alpine.

I'm using natscli 0.1.5 and nats.go v1.36.0, but I think that's unrelated.