ConnectEverything / nats-by-example

Collection of runnable, reference examples using NATS (https://nats.io)
https://natsbyexample.com
146 stars 37 forks source link

Cannot reproduce interest-based Streams example with Java SDK #203

Closed thomasdarimont closed 6 days ago

thomasdarimont commented 1 month ago

Observed behavior

The Java SDK currently lacks some functionality like "double ack" to reproduce the example.

Expected behavior

The Java SDK should offer the same functionality as the other language integrations.

Server and client version

Nats Server Version: 2.10.14 Java SDK:

<dependency>
    <groupId>io.nats</groupId>
    <artifactId>jnats</artifactId>
    <version>2.17.6</version>
</dependency>

Host environment

No response

Steps to reproduce

Inspect and compare the following program with https://natsbyexample.com/examples/jetstream/interest-stream/rust

package demo.streams;

import demo.support.Serde;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.PullSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.api.RetentionPolicy;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsJetStreamMetaData;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

public class InterestBasedStream {

    public static void main(String[] args) throws Exception {

        Options options = Options.builder() //
                .connectionName("jugsaar") //
                .userInfo("jugsaar", "jugsaar") //
                .server("nats://localhost:4222") //
                .build();

        try (Connection conn = Nats.connect(options)) {

            JetStreamManagement jsm = conn.jetStreamManagement();

            jsm.deleteStream("EVENTS");

            StreamConfiguration sc = StreamConfiguration.builder() //
                    .name("EVENTS") //
                    .retentionPolicy(RetentionPolicy.Interest) //
                    .subjects("events.>") //
                    .build();

            StreamInfo si = jsm.addStream(sc);
            System.out.printf("Created stream %s%n", sc.getName());

            JetStream js = conn.jetStream();

            js.publish("events.page_loaded", null);
            js.publish("events.mouse_clicked", null);
            PublishAck publishAck = js.publish("events.input_focused", null);
            System.out.println("Published 3 messages");

            System.out.printf("last message seq: %d%n", publishAck.getSeqno());

            System.out.println("# Stream info without any consumers\n");
            printStreamState(jsm, sc.getName());

            ConsumerInfo c1 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-1") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .build());

            js.publish("events.mouse_clicked", null);
            js.publishAsync("events.input_focused", null);

            System.out.println("# Stream info with one consumer\n");
            printStreamState(jsm, sc.getName());

            PullSubscribeOptions o1 = PullSubscribeOptions.bind("EVENTS", c1.getName());
            JetStreamSubscription sub1 = js.subscribe(null, o1);

            List<Message> fetched = sub1.fetch(2, Duration.ofSeconds(1));
            for (var msg : fetched) {
                msg.ackSync(Duration.ofSeconds(1)); // wait for broker to confirm
            }

            System.out.println("# Stream info with one consumer and acked messages\n");
            printStreamState(jsm, sc.getName());

            ConsumerInfo c2 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-2") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .build());

            PullSubscribeOptions o2 = PullSubscribeOptions.bind("EVENTS", c2.getName());
            JetStreamSubscription sub2 = js.subscribe(null, o2);

            js.publish("events.input_focused", null);
            js.publish("events.mouse_clicked", null);

            fetched = sub2.fetch(2, Duration.ofSeconds(1));
            List<NatsJetStreamMetaData> messageMeta = new ArrayList<>();
            for (var msg : fetched) {
                msg.ackSync(Duration.ofSeconds(1));
                messageMeta.add(msg.metaData());
            }
            System.out.printf("msg seqs %d and %d%n", messageMeta.get(0).streamSequence(), messageMeta.get(1).streamSequence());

            System.out.println("# Stream info with two consumers, but only one set of acked messages\n");
            printStreamState(jsm, sc.getName());

            fetched = sub2.fetch(2, Duration.ofSeconds(1));
            for (var msg : fetched) {
                // double ack feature missing here!
//                msg.ackSync(Duration.ofSeconds(1));
                msg.ack();
            }

            System.out.println("# Stream info with two consumers having both acked\n");
            printStreamState(jsm, sc.getName());

            ConsumerInfo c3 = jsm.createConsumer(sc.getName(), ConsumerConfiguration.builder() //
                    .durable("processor-3") //
                    .ackPolicy(AckPolicy.Explicit) //
                    .filterSubject("events.mouse_clicked") // not interested in input_focussed
                    .build());

            PullSubscribeOptions o3 = PullSubscribeOptions.bind("EVENTS", c3.getName());
            JetStreamSubscription sub3 = js.subscribe(null, o3);

            js.publish("events.input_focused", null);

            var msgs = sub1.fetch(1, Duration.ofSeconds(1));
            var msg = msgs.get(0);
            msg.term();

            msgs = sub2.fetch(1, Duration.ofSeconds(1));
            msg = msgs.get(0);
            msg.ackSync(Duration.ofSeconds(1));

            System.out.println("# Stream info with three consumers with interest from two\n");
            printStreamState(jsm, sc.getName());
        }
    }

    public static void printStreamState(JetStreamManagement jsm, String streamName) throws Exception {

        StreamInfo info = jsm.getStreamInfo(streamName);
        System.out.printf("inspecting stream info%n%s%n", Serde.json(info.getStreamState()));
    }
}

Maven POM excerpt:

<properties>
        <jnats.version>2.17.6</jnats.version>
        <jackson-databind.version>2.17.0</jackson-databind.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.nats</groupId>
            <artifactId>jnats</artifactId>
            <version>${jnats.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>${jackson-databind.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
            <version>${jackson-databind.version}</version>
        </dependency>
    </dependencies>

Serde helper class:

package demo.support;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

public final class Serde {

    public static final ObjectMapper OM;

    static {
        OM = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).findAndRegisterModules();
    }

    public static <T> byte[] jsonBytes(T data) throws Exception {
        return OM.writeValueAsBytes(data);
    }

    public static <T> String json(T data) throws Exception {
        return OM.writeValueAsString(data);
    }

    public static <T> T fromJsonBytes(byte[] data, Class<T> type) throws Exception {
        return OM.readValue(data, type);
    }
}
scottf commented 1 month ago

@thomasdarimont As far as double ack, do you mean acking and waiting for the server to respond to ensure the ack was received? If that is the case you are looking for

void ackSync(Duration timeout) throws TimeoutException, InterruptedException;
scottf commented 1 month ago

@thomasdarimont What do you expect the behavior of double ack to be? ackSync behavior waits for the server to respond for the length of the timeout. ackSync blocks until it gets a response or it times out. A response from the server indicates it has processed the ack.

for (var msg : fetched) {
  // double ack feature missing here!
  // msg.ackSync(Duration.ofSeconds(1));
  // msg.ack();
}
scottf commented 2 weeks ago

@thomasdarimont I'm planning on closing this issue unless I hear back from you.

thomasdarimont commented 2 weeks ago

Hello @scottf sorry for the late response.

I'm trying to replicate the go/rust examples as close as possible for a talk at our local Java User Group. Since not all NATs use-cases have Java examples, I tried to add a Java example for every NATs use-case. I was a bit puzzled, because I couldn't get the application to produce the same output sequence with the Java SDK as the go / rust examples did.

I expected that there is an explicit equivalent for Message#DoubleAck as used here: https://github.com/ConnectEverything/nats-by-example/blob/main/examples/jetstream/interest-stream/go/main.go#L140

Even with msg.ackSync(Duration.ofSeconds(1)); I couldn't replicate the example output, but perhaps that.

If msg.ackSync(...) is the best way to model msg.DoubleAck in the other SDKs I think it would be helpful to highlight this in the documentation somewhere.