nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.57k stars 698 forks source link

Connection draining is not waiting for all pending messages (when using SyncSubscribe) #645

Open acroca opened 3 years ago

acroca commented 3 years ago

Hi,

When shutting down our service we call nc.Drain() and using the connection closed callback we wait for it to be closed. The docs for nc.Drain() say it'll drain all subscriptions, and the docs for subs.Drain() say it'll wait until all messages are processed.

In our case it's not waiting for all messages to be processed. Here's a piece of code showing our problem:

package main

import (
    "fmt"
    "sync/atomic"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    closed := make(chan struct{}, 1)
    nc, err := nats.Connect("nats://localhost:4222", nats.ClosedHandler(func(_ *nats.Conn) {
        close(closed)
    }))
    noerr(err)
    defer nc.Close()

    js, err := nc.JetStream()
    noerr(err)

    _, err = js.Publish("ORDERS.received", []byte("hello world"))
    noerr(err)

    subs, err := js.SubscribeSync("ORDERS.received",
        nats.Attach("ORDERS", "NEW"),
        nats.Pull(1))
    noerr(err)

    var received int32 = 0
    var processed int32 = 0
    go func() {
        msg, err := subs.NextMsg(5 * time.Second)
        noerr(err)
        atomic.AddInt32(&received, 1)
        time.Sleep(5000 * time.Millisecond)
        atomic.AddInt32(&processed, 1)
        msg.Ack()
    }()
    time.Sleep(100 * time.Millisecond)
    nc.Drain()

    <-closed

    fmt.Printf("Received: %d, Processed: %d\n", received, processed)
}

func noerr(err error) {
    if err != nil {
        panic(err)
    }
}

The stream and consumer exist already in the server (created with terraform).

This finishes showing Received: 1, Processed: 0, but it should wait for for the message to be processed, shouldn't it?

wallyqs commented 3 years ago

Possible fix for this: https://github.com/nats-io/nats.go/pull/650

acroca commented 3 years ago

I've tested running that piece of code using your PR and still shows Received: 1, Processed: 0. I don't think it's waiting for in-flight messages to be acked.

wallyqs commented 3 years ago

OK I see now, because it is using a sync pull subscriber then the client wouldn't be able to let finish processing a message (as it is right now). Something that could help is to add another context and calling drain on the sub when needed:

package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    closed := make(chan struct{}, 1)
    nc, err := nats.Connect("nats://localhost:4222",
        nats.ClosedHandler(func(_ *nats.Conn) {
            close(closed)
        }),
        nats.DrainTimeout(5*time.Second))
    noerr(err)
    defer nc.Close()

    js, err := nc.JetStream()
    noerr(err)

    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "ORDERS",
        Subjects: []string{"ORDERS.received"},
    })
    noerr(err)

    _, err = js.Publish("ORDERS.received", []byte("hello world"))
    noerr(err)

    sub, err := js.SubscribeSync("ORDERS.received",
        nats.Durable("NEW"),
        nats.Pull(1))
    noerr(err)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    drain := func() {
        sub.Drain()
        cancel()
    }

    var received int32 = 0
    var processed int32 = 0
    go func() {
        msg, err := sub.NextMsg(2 * time.Second)
        noerr(err)
        atomic.AddInt32(&received, 1)
        time.Sleep(2 * time.Second)
        atomic.AddInt32(&processed, 1)
        err = msg.AckSync(nats.MaxWait(2 * time.Second))
        drain()
    }()

    // Wait until goroutine has finished processing.
    <-ctx.Done()

    // Call drain and close the connection.
    nc.Drain()
    <-closed

    fmt.Printf("Received: %d, Processed: %d\n", received, processed)
}

func noerr(err error) {
    if err != nil {
        panic(err)
    }
}