ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
7.26k stars 388 forks source link

nats jetstream consumer always received old ack message #389

Closed gatspy closed 9 months ago

gatspy commented 1 year ago

comsumer receive msg and ack ok, then start consumer again, received a message that has been consumed before. nats-server: v2.9.19

nats conf file

listen: 0.0.0.0:4222
server_name: nats_loc_development
debug: true
trace: true
# system_account: SYSTEM
logtime: true
# log_file: "/usr/local/var/log/nats/log.log"
# monitor
http_port: 8222

jetstream {
    store_dir: /usr/local/var/data/nats/data
    // 1GB
    max_memory_store: 1073741824
    // 10GB
    max_file_store: 10737418240
}

mqtt {
    listen: 0.0.0.0:1883
    # port: 1833

}

cluster {

}

authorization {
    timeout: 5
    users: [
        {user: "admin", password: "$2a$11$vX.TixDtAmdpoXc/JMqWuepOyGFUPYslTAWxoT3geKETNOaHHjagq"}
    ]
 }
}

watermill-nats/_examples/jetstream.go

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats"
    "github.com/ThreeDotsLabs/watermill/message"
    nc "github.com/nats-io/nats.go"
)

func main() {
    natsURL := "nats://localhost:4222"

    marshaler := &nats.GobMarshaler{}
    logger := watermill.NewStdLogger(false, false)
    options := []nc.Option{
        nc.RetryOnFailedConnect(true),
        nc.Timeout(30 * time.Second),
        nc.ReconnectWait(1 * time.Second),
        nc.UserInfo("admin", "Nats1012401231671400001"),
    }
    subscribeOptions := []nc.SubOpt{
        nc.DeliverAll(),
        nc.AckExplicit(),
    }

    jsConfig := nats.JetStreamConfig{
        Disabled:         false,
        AutoProvision:    true,
        ConnectOptions:   nil,
        SubscribeOptions: subscribeOptions,
        PublishOptions:   nil,
        TrackMsgId:       false,
        AckAsync:         false,
        DurablePrefix:    "",
    }
    subscriber, err := nats.NewSubscriber(
        nats.SubscriberConfig{
            //  URL:            svr.ClientURL(),
            URL:            natsURL,
            CloseTimeout:   30 * time.Second,
            AckWaitTimeout: 30 * time.Second,
            NatsOptions:    options,
            Unmarshaler:    marshaler,
            JetStream:      jsConfig,
        },
        logger,
    )
    if err != nil {
        panic(err)
    }

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-c
        fmt.Println("\r- Ctrl+C pressed in Terminal - closing subscriber")
        subscriber.Close()
        os.Exit(0)
    }()

    messages, err := subscriber.Subscribe(context.Background(), "example_topic")
    if err != nil {
        panic(err)
    }

    go processJSM(messages)

    //publisher, err := nats.NewPublisher(
    //  nats.PublisherConfig{
    //      URL:         svr.ClientURL(),
    //      NatsOptions: options,
    //      Marshaler:   marshaler,
    //      JetStream:   jsConfig,
    //  },
    //  logger,
    //)
    //if err != nil {
    //  panic(err)
    //}
    //
    //for {
    //  msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
    //
    //  if err := publisher.Publish("example_topic", msg); err != nil {
    //      panic(err)
    //  }
    //
    //  time.Sleep(time.Second)
    //}

    select {}
}

func processJSM(messages <-chan *message.Message) {
    for msg := range messages {
        log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))

        // we need to Acknowledge that we received and processed the message,
        // otherwise, it will be resent over and over again.
        msg.Ack()
    }
}
AlexCuse commented 1 year ago

The examples use an embedded server. Since you connect to an out of process server the stream and possibly your consumers are being persisted. With DeliverAll specified, you are asking for everything in the stream explicitly, and nats is delivering :)

dotX12 commented 1 year ago

The examples use an embedded server. Since you connect to an out of process server the stream and possibly your consumers are being persisted. With DeliverAll specified, you are asking for everything in the stream explicitly, and nats is delivering :)

I encountered the same problem, having launched the publisher once - I recorded a message in the nats, and now by launching the listener 1-2-10 times, every time I see a message that has already been confirmed. How to avoid this? According to the idea, when trying the second run subscriber, I shouldn't see confirmed messages

gatspy commented 12 months ago

@AlexCuse Unrelated to the DeliverAll setting, with the same configuration using the previous library watermill-jetstream, won't receive duplicate messages for subscribed events.

AlexCuse commented 10 months ago

@dotX12 it sounds like you want both subscribers to form a queue group.

@gatspy if you can post both as runnable examples somewhere I will take a look but I'm pretty confident that is the reason. Let me know the NATS version you are running too.

AlexCuse commented 10 months ago

I added a queueing example at https://github.com/ThreeDotsLabs/watermill-nats/blob/master/_examples/jetstream_queue.go

Here are some runs of this example to illustrate what I mean about deliver all (I made same change above to use an externally defined server URL). Notice that on run 2-3 any message that had previously been collected in the stream is redelivered. If you switch to DeliverNew you should see the behavior you expect - but you will not get anything that landed in the stream while you were off line.

image

One thing that is important to remember here is that stream autoprovisioning is very weak and mostly exists to support tests - there is a new upstream API that makes it much easier to support well but in general a real application should be either setting up its own infrastructure outside watermill or even using externally created streams with properly configured retention policies and such. An "interest-based" retention policy would achieve what you are after in this case I think.

AlexCuse commented 10 months ago

Finally we are working on a new package targeting the mentioned upstream API - it landed in master today and in future version will show up at pkg/jetstream - its very minimal for now but open to feedback on how it should work. The old package is in a weird place because of its roots targeting the deprecated nats-streaming-server but we're trying to take a fresh start building around the new API.

gatspy commented 9 months ago

I added a queueing example at https://github.com/ThreeDotsLabs/watermill-nats/blob/master/_examples/jetstream_queue.go

Here are some runs of this example to illustrate what I mean about deliver all (I made same change above to use an externally defined server URL). Notice that on run 2-3 any message that had previously been collected in the stream is redelivered. If you switch to DeliverNew you should see the behavior you expect - but you will not get anything that landed in the stream while you were off line.

image

One thing that is important to remember here is that stream autoprovisioning is very weak and mostly exists to support tests - there is a new upstream API that makes it much easier to support well but in general a real application should be either setting up its own infrastructure outside watermill or even using externally created streams with properly configured retention policies and such. An "interest-based" retention policy would achieve what you are after in this case I think.

@AlexCuse tks, I'm still a bit confused. i'm crazy, can't find the problem and it's been torturing me for days!!!. Why can I continue to receive messages that I have already consumed and acknowledged from the same queue? Is this a problem with nats.go itself, or with waterwill-nats? why use https://github.com/ThreeDotsLabs/watermill-jetstream pacakge, same result as jetstream_new

run _examples/jetstream_new.go this will not consume message again

image

run _examples/jetstream_queue.go

image
gatspy commented 9 months ago

finally figured out the reason. take a look at this issue,nats-io/nats.go#1036 image