nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
16k stars 1.41k forks source link

Inconsistent stream messages consumption ordering #6114

Closed Wertual08 closed 1 week ago

Wertual08 commented 1 week ago

Observed behavior

As far as I`m concern, stream consumer must process messages in order they were sent. But we faced the situation, when the order is somehow broken. If consumer fetched a batch of messages, no ack was sent to the server and consumer died, the following happens after new consumer is started: If new consumer boots up before AckWait, messages are received in expected order. If new consumer boots up after AckWait is passed, first message will become the last one...

I`m sorry if I did something wrong, but it seems to me as an issue with nats-server.

Expected behavior

Consumers are expected to process messages in order.

Server and client version

nats-server: v2.10.22 github.com/nats-io/nats.go v1.37.0

Host environment

Nats container: docker.io/nats:2.10.22 Go version: 1.21.6

Steps to reproduce

Go code

package main

import (
    "context"
    "fmt"
    "time"

    "log"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {
    cluster, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatalln(err)
    }

    js, err := jetstream.New(cluster)
    if err != nil {
        log.Fatalln(err)
    }

    _ = js.DeleteStream(context.Background(), "test")
    stream, err := js.CreateStream(context.Background(), jetstream.StreamConfig{
        Name: "test",
        Subjects: []string{
            "test.>",
        },
        Retention: jetstream.LimitsPolicy,
        Storage: jetstream.FileStorage,
        MaxMsgs: 100,
    })
    if err != nil {
        log.Fatalln(err)
    }

    for i := 0; i < 5; i += 1 {
        subject := fmt.Sprintf("test.%d", i % 2)
        payload := fmt.Sprintf("msg: %d", i)
        _, err := js.Publish(context.Background(), subject, []byte(payload))
        if err != nil {
            log.Fatalln(err)
        }
    }

    log.Println("First consume")
    consume(stream)

    log.Println("Consume before timeout")
    consume(stream)

    time.Sleep(2 * time.Second)

    log.Println("Consume after timeout")
    consume(stream)

    log.Println("DONE")
}

func consume(stream jetstream.Stream) {
    consumer, err := stream.CreateConsumer(context.Background(), jetstream.ConsumerConfig{
        Durable: "test-eater",
        DeliverPolicy: jetstream.DeliverAllPolicy,
        AckPolicy: jetstream.AckExplicitPolicy,
        AckWait: 1 * time.Second,
    })
    if err != nil {
        log.Fatalln(err)
    }

    messages, err := consumer.Messages()
    if err != nil {
        log.Fatalln(err)
    }
    defer messages.Stop()

    for i := 0; i < 5; i += 1 {
        msg, err := messages.Next() 
        if err != nil {
            log.Println(err)
            return
        }

        log.Println(msg.Subject(), string(msg.Data()))
    }
}

docker-compose.yml

services:
  nats1:
    image: docker.io/nats:2.10.22
    ports:
      - "4222:4222"
      - "8222:8222"      
    command:
      - "--name=nats1"
      - "--cluster_name=c1"
      - "--cluster=nats://nats1:6222"
      - "--routes=nats-route://nats1:6222,nats-route://nats2:6222,nats-route://nats3:6222"
      - "--http_port=8222"
      - "--js"
      - "--sd=/data"

  nats2:
    image: docker.io/nats:2.10.22
    ports:
      - "4223:4222"
      - "8223:8222"
    command:
      - "--name=nats2"
      - "--cluster_name=c1"
      - "--cluster=nats://nats2:6222"
      - "--routes=nats-route://nats1:6222,nats-route://nats2:6222,nats-route://nats3:6222"
      - "--http_port=8222"
      - "--js"
      - "--sd=/data"

  nats3:
    image: docker.io/nats:2.10.22
    ports:
      - "4224:4222"
      - "8224:8222"
    command:
      - "--name=nats3"
      - "--cluster_name=c1"
      - "--cluster=nats://nats3:6222"
      - "--routes=nats-route://nats1:6222,nats-route://nats2:6222,nats-route://nats3:6222"
      - "--http_port=8222"
      - "--js"
      - "--sd=/data"

Output:

2024/11/12 23:00:07 First consume
2024/11/12 23:00:07 test.0 msg: 0
2024/11/12 23:00:07 test.1 msg: 1
2024/11/12 23:00:07 test.0 msg: 2
2024/11/12 23:00:07 test.1 msg: 3
2024/11/12 23:00:07 test.0 msg: 4
2024/11/12 23:00:07 Consume before timeout
2024/11/12 23:00:08 test.0 msg: 0
2024/11/12 23:00:08 test.1 msg: 1
2024/11/12 23:00:08 test.0 msg: 2
2024/11/12 23:00:08 test.1 msg: 3
2024/11/12 23:00:08 test.0 msg: 4
2024/11/12 23:00:10 Consume after timeout
2024/11/12 23:00:10 test.1 msg: 1
2024/11/12 23:00:10 test.0 msg: 2
2024/11/12 23:00:10 test.1 msg: 3
2024/11/12 23:00:10 test.0 msg: 4
2024/11/12 23:00:10 test.0 msg: 0
2024/11/12 23:00:10 DONE
Jarema commented 1 week ago

Its exptected behaviour. That is how redeliveries work - it allows you to continue processing messages and get redeliveries for missed/unacked messages when their AckWait threshold is reached.

To guarantee order, you need to use MaxAckPending = 1, or an OrderedConsumer if you want to quickly whip through the stream and discard the consumer (it uses client-side, no-ack logic with ephemeral consumers to ensure the order).

Wertual08 commented 1 week ago

@Jarema oh, okay, thanks you very much for your answer. One more question: Why then order is dependent on time, which is passed between death of old consumer, and boot of new one?

Jarema commented 1 week ago

Because messages is redelivered after AckWait passes. Also, for accuracy, it's not a new consumer, just client binding to it again. Consumer is a server side construct, which state is held in the server/cluster.

Wertual08 commented 1 week ago

Thanks again! Sorry for using your time, closing this issue.