nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.55k stars 696 forks source link

StreamConfig options are not considered when a stream is created #1566

Closed Issif closed 8 months ago

Issif commented 8 months ago

Observed behavior

Hi,

I'm embedding a NATS Jetstream sever in my app, to use the deduplication feature of JS. It works well, my settings nats.StreamConfig{} are not considered

Here's my snippet to create the stream:

func (client *Client) createStream(timeWindow int) error {
    stream, err := client.JetStreamContext.StreamInfo("EVENTS")
    if err != nil {
        if err != nats.ErrStreamNotFound {
            return err
        }
    }
    if stream == nil {
        _, err = client.JetStreamContext.AddStream(&nats.StreamConfig{
            Name:                 "EVENTS",
            Subjects:             []string{"EVENTS.*"},
            Duplicates:           5 * time.Second,
            MaxAge:               10 * time.Second,
            MaxMsgsPerSubject:    1,
            Storage:              nats.MemoryStorage,
        })
        if err != nil {
            return err
        }
    }
    return nil
}

But when I check the stream info, I get:

❯ nats stream info EVENTS
Information for Stream EVENTS created 2024-02-22 17:14:05

              Subjects: EVENTS.*
              Replicas: 1
               Storage: File

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 5.00s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: unlimited
         Maximum Bytes: unlimited
           Maximum Age: 20.00s
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Expected behavior

The settings of my stream should be:

❯ nats stream info EVENTS
Information for Stream EVENTS created 2024-02-22 17:14:05

              Subjects: EVENTS.*
              Replicas: 1
               Storage: Memory

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 5.00s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: 1
         Maximum Bytes: unlimited
           Maximum Age: 10.00s
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

Server and client version

Versions:

Host environment

No response

Steps to reproduce

No response

piotrpio commented 8 months ago

Hello @Issif, thank you for creating the issue.

Could you provide debug server logs from when AddStream() is called?

Issif commented 8 months ago

@piotrpio

here's my snipped to activate the logs, but nothing is printed in stdout/stderr:

func StartServer(timeWindow int) (*natsserver.Server, error) {
    ns, err := natsserver.NewServer(
        &natsserver.Options{
            JetStream: true,
            Debug:      true,
        })
    if err != nil {
        return nil, err
    }
    go ns.Start()
    return ns, nil

I noticed something interesting anyway, if I start an embedded NATS server with the snippet above and use it, the settings in AddStream are not considered, but I use an external NATS server, it works:

❯ nats stream info EVENTS
Information for Stream EVENTS created 2024-02-26 14:17:00

              Subjects: EVENTS.*
              Replicas: 1
               Storage: Memory

Options:

             Retention: Limits
       Acknowledgments: true
        Discard Policy: Old
      Duplicate Window: 5.00s
     Allows Msg Delete: true
          Allows Purge: true
        Allows Rollups: false

Limits:

      Maximum Messages: unlimited
   Maximum Per Subject: 1
         Maximum Bytes: unlimited
           Maximum Age: 10.00s
  Maximum Message Size: unlimited
     Maximum Consumers: unlimited

State:

              Messages: 1
                 Bytes: 461 B
        First Sequence: 1 @ 2024-02-26 14:17:04 UTC
         Last Sequence: 1 @ 2024-02-26 14:17:04 UTC
      Active Consumers: 1
    Number of Subjects: 1
piotrpio commented 8 months ago

I tried replicating your case with this test:

func TestEmbeddedSServerAddStream(t *testing.T) {
    ns, err := server.NewServer(&server.Options{
        JetStream: true,
    })
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }

    ns.Start()
    defer ns.Shutdown()

    nc, err := nats.Connect(ns.ClientURL())
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }
    defer nc.Close()
    js, err := nc.JetStream()
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }

    stream, err := js.StreamInfo("EVENTS")
    if err != nil {
        if err != nats.ErrStreamNotFound {
            t.Fatalf("Unexpected error: %v", err)
        }
    }
    if stream == nil {
        _, err = js.AddStream(&nats.StreamConfig{
            Name:              "EVENTS",
            Subjects:          []string{"EVENTS.*"},
            Duplicates:        5 * time.Second,
            MaxAge:            10 * time.Second,
            MaxMsgsPerSubject: 1,
            Storage:           nats.MemoryStorage,
        })
        if err != nil {
            t.Fatalf("Unexpected error: %v", err)
        }
    }
    info, err := js.StreamInfo("EVENTS")
    if err != nil {
        t.Fatalf("Unexpected error: %v", err)
    }
    fmt.Printf("Stream info: %+v\n", info)
}

What it does:

  1. Starts a new server with JetStream
  2. Connects to the server
  3. Fetches stream info for "EVENTS" stream
  4. Creates a stream with your config if it does not exist
  5. Gets the stream info and prints it

That seems to work correctly as it returns the correct config values:

=== RUN   TestEmbeddedSServerAddStream
Stream info: &{Config:{Name:EVENTS Description: Subjects:[EVENTS.*] Retention:Limits MaxConsumers:-1 MaxMsgs:-1 MaxBytes:-1 Discard:DiscardOld DiscardNewPerSubject:false MaxAge:10s MaxMsgsPerSubject:1 MaxMsgSize:-1 Storage:Memory Replicas:1 NoAck:false Template: Duplicates:5s Placement:<nil> Mirror:<nil> Sources:[] Sealed:false DenyDelete:false DenyPurge:false AllowRollup:false Compression:None FirstSeq:0 SubjectTransform:<nil> RePublish:<nil> AllowDirect:false MirrorDirect:false ConsumerLimits:{InactiveThreshold:0s MaxAckPending:0} Metadata:map[]} Created:2024-02-26 13:49:46.616838 +0000 UTC State:{Msgs:0 Bytes:0 FirstSeq:0 FirstTime:0001-01-01 00:00:00 +0000 UTC LastSeq:0 LastTime:0001-01-01 00:00:00 +0000 UTC Consumers:0 Deleted:[] NumDeleted:0 NumSubjects:0 Subjects:map[]} Cluster:0x140001a9400 Mirror:<nil> Sources:[] Alternates:[]}
--- PASS: TestEmbeddedSServerAddStream (0.01s)

It would be super useful if you could provide a reproducible example.

Issif commented 8 months ago

Here's the POC I did to test the deduplication, I'm facing the issue with: https://gist.github.com/Issif/7607f63fcd934149bc00060d47468bb4

And the results I got:

❯ go run .
2024/02/26 15:01:04 Starting...
Stream info: &{Config:{Name:EVENTS Description: Subjects:[EVENTS.*] Retention:Limits MaxConsumers:-1 MaxMsgs:-1 MaxBytes:-1 Discard:DiscardOld DiscardNewPerSubject:false MaxAge:20s MaxMsgsPerSubject:-1 MaxMsgSize:-1 Storage:File Replicas:1 NoAck:false Template: Duplicates:5s Placement:<nil> Mirror:<nil> Sources:[] Sealed:false DenyDelete:false DenyPurge:false AllowRollup:false RePublish:<nil> AllowDirect:false MirrorDirect:false} Created:2024-02-22 16:14:05.874865694 +0000 UTC State:{Msgs:0 Bytes:0 FirstSeq:4611686018427388089 FirstTime:1970-01-01 00:00:00 +0000 UTC LastSeq:4611686018427388088 LastTime:2024-02-26 13:59:50.396556636 +0000 UTC Consumers:2 Deleted:[] NumDeleted:0 NumSubjects:0 Subjects:map[]} Cluster:0xc000204440 Mirror:<nil> Sources:[] Alternates:[]}
piotrpio commented 8 months ago

Thanks! I ran your POC and it still looks good for me:

~/g/nats.go go run .

2024/02/26 15:11:55 Starting...
2024/02/26 15:11:55 Creating stream: EVENTS
Stream info: &{Config:{Name:EVENTS Description: Subjects:[EVENTS.*] Retention:Limits MaxConsumers:-1 MaxMsgs:-1 MaxBytes:-1 Discard:DiscardOld DiscardNewPerSubject:false MaxAge:10s MaxMsgsPerSubject:1 MaxMsgSize:-1 Storage:Memory Replicas:1 NoAck:false Template: Duplicates:5s Placement:<nil> Mirror:<nil> Sources:[] Sealed:false DenyDelete:false DenyPurge:false AllowRollup:false Compression:None FirstSeq:0 SubjectTransform:<nil> RePublish:<nil> AllowDirect:false MirrorDirect:false ConsumerLimits:{InactiveThreshold:0s MaxAckPending:0} Metadata:map[]} Created:2024-02-26 14:11:55.460311 +0000 UTC State:{Msgs:0 Bytes:0 FirstSeq:0 FirstTime:0001-01-01 00:00:00 +0000 UTC LastSeq:0 LastTime:0001-01-01 00:00:00 +0000 UTC Consumers:0 Deleted:[] NumDeleted:0 NumSubjects:0 Subjects:map[]} Cluster:0x140001445c0 Mirror:<nil> Sources:[] Alternates:[]}
2024/02/26 15:11:55 Consumer  =>  Subject: EVENTS.test - Message: test
...

I see that you've added a log line: log.Printf("Creating stream: %s\n", StreamName) where you would call AddStream(), but I don't see it in the log output you provided. That means that your StreamInfo call here returns a stream, so you never actually add a new stream. This is just an EVENTS stream you've already created earlier.

When you embed a server, server API looks for a StoreDir option - if not provided, it will use a tmp dir. This directory stores all the JetStream assets and state and it looks like you have an EVENTS stream there already. You should either remove it (from API, cli, however you see fit) or use a different StoreDir to have fresh JetStream.

Issif commented 8 months ago

You're right, thank you a lot.

By starting the server with StoreDir: nats.MemoryStorage.String() everything works fine (working in memory is perfect for my use case).

Thanks a lot, I hope this issue and your answer will help others.