nats-io / nats.java

Java client for NATS
Apache License 2.0
563 stars 153 forks source link

All the consumers in a deliver group receive the same message #1148

Closed DavideDA closed 3 months ago

DavideDA commented 3 months ago

Observed behavior

I set up 2 jetstream push consumers that listen to the same subject, both in the same deliver group. I publish a message for that subject and both consumers consume that message.

Expected behavior

Only 1 consumer consumes the message.

Server and client version

nats-server 2.10.14 jnats 2.17.6 java version 1.8.0_411

Host environment

Windows 11

Steps to reproduce

Down here consumer "simplified" source code:

Options.Builder builder = new Options.Builder()
                    .maxReconnects(-1)
                    .server("nats://127.0.0.1:4222");

Connection connection = Nats.connect(builder.build());

StreamInfo streamInfo = null;
JetStreamManagement jetStreamManagement = connection.jetStreamManagement();
try {
    streamInfo = jetStreamManagement.getStreamInfo("mystream");
} catch (JetStreamApiException jsae) {
    if (jsae.getErrorCode() != 404) {
        throw jsae;
    }
}

if (streamInfo == null) {
    StreamConfiguration streamConfiguration = StreamConfiguration.builder()
            .name("mystream")
            .subjects(Collections.singletonList("mysubject"))
            .maxAge(Duration.ofDays(1))
            .build();

    jetStreamManagement.addStream(streamConfiguration);
}

StreamContext streamContext = connection.getStreamContext("mystream");

// Each consumer has its own jetStreamClientId, so 2 consumers produce 2 different durableNames
String durableName = String.format("%s-mysubject-durable", jetStreamClientId);

ConsumerConfiguration.Builder consumerConfiguration = ConsumerConfiguration.builder()
    .filterSubject("mysubject")
    .durable(durableName)
    .deliverGroup("mydelivergroup")
    .maxAckPending(1);

streamContext.createOrUpdateConsumer(consumerConfiguration.build());
consumerContext = streamContext.getConsumerContext(durableName);

MessageHandler handler = msg -> {
    // Do something...
    msg.ack();
};

consumerContext.consume(handler);

Basically, the only difference between the 2 consumers is the durable name.

If I replace .filterSubject("mysubject") with .deliverSubject("mysubject") I obtain an error on start:

consumer deliver subject forms a cycle [10081]

Thank you in advance!

scottf commented 3 months ago

Deliver groups only work for push consumers. Simplification (Stream/Consumer Context), use pull consumers. Just use the same durable name on multiple instances. Here is a full example:

import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Z1148 {
    public static String STREAM = "stream";
    public static String SUBJECT = "subject";
    public static String GROUP = "group";
    public static String DURABLE = "durable";

    public static void main(String[] args) {
        try {
            try (Connection connection = Nats.connectReconnectOnConnect()) {
                JetStreamManagement jsm = connection.jetStreamManagement();
                createStream(jsm);
                for (int x = 0; x < 20; x++) {
                    jsm.jetStream().publish(SUBJECT, null);
                }

                ConsumerConfiguration consumerConfiguration = ConsumerConfiguration.builder()
                    .filterSubject(SUBJECT)
                    .durable(DURABLE)
                    .deliverGroup(GROUP)
                    .maxAckPending(1)
                    .build();

                StreamContext streamContext = connection.getStreamContext(STREAM);

                streamContext.createOrUpdateConsumer(consumerConfiguration);

                ConsumerContext consumerContext1 = streamContext.getConsumerContext(DURABLE);
                ConsumerContext consumerContext2 = streamContext.getConsumerContext(DURABLE);

                CountDownLatch latch1 = new CountDownLatch(5);
                CountDownLatch latch2 = new CountDownLatch(5);

                MessageHandler handler1 = msg -> {
                    System.out.println("Handler 1 received " + msg.metaData().streamSequence());
                    msg.ack();
                    latch1.countDown();
                };

                MessageHandler handler2 = msg -> {
                    System.out.println("Handler 2 received " + msg.metaData().streamSequence());
                    msg.ack();
                    latch2.countDown();
                };

                consumerContext1.consume(handler1);
                consumerContext2.consume(handler2);

                latch1.await();
                latch2.await();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void createStream(JetStreamManagement jsm) throws IOException, JetStreamApiException {
        try { jsm.deleteStream(STREAM); } catch (Exception ignore) {}

        try {
            StreamConfiguration sc = StreamConfiguration.builder()
                .name(STREAM)
                .storageType(StorageType.Memory)
                .subjects(SUBJECT)
                .build();
            jsm.addStream(sc);
            System.out.println("Created stream: '" + STREAM + "'");
        }
        catch (Exception e) {
            System.out.println("Failed creating stream: '" + STREAM + "' " + e);
        }
    }
}