nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.54k stars 696 forks source link

JetStream ConsumerContext `Drain` function not waiting for in flight requests to be done processing #1672

Open thomas-maurice opened 3 months ago

thomas-maurice commented 3 months ago

Observed behavior

I am observing that the ConsumerContext.Drain function does not wait for the processing of in flight requests to finish before returning.

Expected behavior

I would expect the Drain function to wait for the processing of the current in flight message to finish before returning, allowing to cleanly exit the consumer.

This is especially odd because the documentation states the following:

Drain unsubscribes from the stream and cancels subscription. All messages that are already in the buffer will be processed in callback function.

So I would expect that the Drain function would block until the buffer is empty, then return. I am observing the same behaviour with the Stop function (though I would expect it to only wait for the current Consume run to finish)

It would be really neat if either Drain & Stop functions could block until the buffer/current inflight are processed (or after a timeout has passed if it is taking too long), or if we could have a method in the Consumer or ConsumerContext that could provide us with the number of requests being processed or still waiting for processing so we could determine when it is safe to shutdown the program, if I am not mistaking right now there are none

Server and client version

nats-server: v2.10.17
nats: 0.1.4
nats go lib: github.com/nats-io/nats.go v1.36.0

Host environment

OSX 14.4.1 on M1 Max Reproduced on a framework 13 AMD, running Arch with the latest linux kernel

Steps to reproduce

You can use the following code to validate the behaviour. This creates a stream and a consumer listening on the subjects hello.>, the Consume function takes an artificially long amount of time to process the requests (5s).

By running this program, sending a message through one of the listened to subjects like so nats pub hello.foo hello and killing the program right after the wait started, we notice that it exits before the Consume run has finished.

package main

import (
    "context"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
    "github.com/sirupsen/logrus"
)

const (
    NATS_URL = "nats://localhost:4222"
)

func mustGetNats(url string) *nats.Conn {
    conn, err := nats.Connect(url)
    if err != nil {
        logrus.WithError(err).Panic("could not connect to NATS")
    }

    return conn
}

func main() {
    consumerConn := mustGetNats(NATS_URL)

    js, err := jetstream.New(consumerConn)
    if err != nil {
        logrus.WithError(err).Fatal("could not open jetstream")
    }

    _, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
        Name:        "programatic-stream",
        Description: "Some description",
        Subjects:    []string{"hello.>"},
        Retention:   jetstream.LimitsPolicy,
        MaxBytes:    2 * 1024 * 1024 * 1024,
        MaxMsgs:     10000,
        Discard:     jetstream.DiscardOld,
        MaxAge:      time.Hour * 24 * 14,
        Storage:     jetstream.FileStorage,
        Replicas:    1,
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create stream")
    }

    _, err = js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
        Durable:       "programatic-consumer",
        Name:          "programatic-consumer",
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    10,
        DeliverPolicy: jetstream.DeliverNewPolicy,
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create consumer")
    }

    consumer, err := js.Consumer(context.Background(), "programatic-stream", "programatic-consumer")
    if err != nil {
        logrus.WithError(err).Fatal("could not get the consumer")
    }

    consumerContext, err := consumer.Consume(func(msg jetstream.Msg) {
        logrus.Info("artificially wait 5s")
        time.Sleep(time.Second * 5)
        logrus.Info("waited 5s")
        logrus.Infof("got a message %s\n", string(msg.Data()))
        msg.Ack()
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create consumer")
    }

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
    <-sigs
    logrus.Info("got interrupted")
    consumerContext.Drain()
    logrus.Info("drained consumer")
}

The console output I get is the following

INFO[0019] artificially wait 5s                         
^CINFO[0022] got interrupted                              
INFO[0022] drained consumer     

When I would have expected something like this

INFO[0019] artificially wait 5s                         
^CINFO[0020] got interrupted                              
INFO[0024] waited 5s
INFO[0024] got a message: hello
INFO[0024] drained consumer     

Am I doing something wrong or is it actually a bug ?

JakubSchneller commented 3 months ago

I'm dealing with the same exact issue.

jamm3e3333 commented 3 months ago

me too

krizacekcz commented 3 months ago

We are using the new API too and you are having the same issue. But off-topic one: why do you call CreateOrUpdateConsumer, emit the result and then you call Consumer again, when u can receive it in the first call ?

_, err = js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
        Durable:       "programatic-consumer",
        Name:          "programatic-consumer",
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    10,
        DeliverPolicy: jetstream.DeliverNewPolicy,
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create consumer")
    }

    consumer, err := js.Consumer(context.Background(), "programatic-stream", "programatic-consumer")
    if err != nil {
        logrus.WithError(err).Fatal("could not get the consumer")
    }
piotrpio commented 3 months ago

Please see my comment on the other issue: https://github.com/nats-io/nats.go/issues/1673#issuecomment-222819737

thomas-maurice commented 3 months ago

We are using the new API too and you are having the same issue. But off-topic one: why do you call CreateOrUpdateConsumer, emit the result and then you call Consumer again, when u can receive it in the first call ?

That's an oversight of my part i could just get the consumer from that line, I put it together in a hurry for the example

@piotrpio nice, I'll check it out thanks !

thomas-maurice commented 3 months ago

I would like to add a bit more input to that, I tried as suggested by @typecampo on Slack calling Drain on the Nats connection does what I intend, as opposed to calling it on the consumer directly. This is odd that it works on the connection and not the consumer, but that's a workaround!

As an example here is the code I used to test it:

package main

import (
    "context"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
    "github.com/sirupsen/logrus"
)

const (
    NATS_URL = "nats://localhost:4222"
)

func main() {
    closedWg := &sync.WaitGroup{}
    closedWg.Add(1)
    consumerConn, err := nats.Connect(NATS_URL, nats.ClosedHandler(func(c *nats.Conn) {
        closedWg.Done()
    }))
    if err != nil {
        logrus.WithError(err).Panic("could not connect to NATS")
    }

    js, err := jetstream.New(consumerConn)
    if err != nil {
        logrus.WithError(err).Fatal("could not open jetstream")
    }

    _, err = js.CreateOrUpdateStream(context.Background(), jetstream.StreamConfig{
        Name:        "programatic-stream",
        Description: "Some description",
        Subjects:    []string{"hello.>"},
        Retention:   jetstream.LimitsPolicy,
        MaxBytes:    2 * 1024 * 1024 * 1024,
        MaxMsgs:     10000,
        Discard:     jetstream.DiscardOld,
        MaxAge:      time.Hour * 24 * 14,
        Storage:     jetstream.FileStorage,
        Replicas:    1,
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create stream")
    }

    consumer, err := js.CreateOrUpdateConsumer(context.Background(), "programatic-stream", jetstream.ConsumerConfig{
        Durable:       "programatic-consumer",
        Name:          "programatic-consumer",
        AckPolicy:     jetstream.AckExplicitPolicy,
        MaxDeliver:    10,
        DeliverPolicy: jetstream.DeliverNewPolicy,
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create consumer")
    }

    _, err = consumer.Consume(func(msg jetstream.Msg) {
        logrus.Info("artificially wait 5s")
        time.Sleep(time.Second * 5)
        logrus.Info("waited 5s")
        logrus.Infof("got a message %s\n", string(msg.Data()))
        msg.Ack()
    })

    if err != nil {
        logrus.WithError(err).Fatal("could not create consumer")
    }

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
    <-sigs
    logrus.Info("got interrupted")
    consumerConn.Drain()
    closedWg.Wait()
    logrus.Info("drained consumer")
}

By sending two consecutive messages to the subject here is the output I get

INFO[0003] artificially wait 5s                         
^CINFO[0004] got interrupted                              
INFO[0008] waited 5s                                    
INFO[0008] got a message test                           
INFO[0008] artificially wait 5s                         
INFO[0013] waited 5s                                    
INFO[0013] got a message test                           
INFO[0013] drained consumer 

Hope it helps!