nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.58k stars 1.39k forks source link

Stream Info response received for duplicate Stream create #2658

Closed ripienaar closed 2 years ago

ripienaar commented 2 years ago

Given config.json

{
  "name": "monitoring",
  "subjects": [
    "mon.*"
  ],
  "retention": "limits",
  "max_consumers": 128,
  "max_msgs_per_subject": 128,
  "max_msgs": 1024,
  "max_bytes": 100000,
  "max_age": 0,
  "max_msg_size": 1024,
  "storage": "file",
  "discard": "old",
  "num_replicas": 1,
  "duplicate_window": 120000000000
}

We first create a stream it's all fine:

$ nats s add BEN --config config.json
Stream BEN was created
....

But if we do just the exact same immediately after, we should get a dupe error or if identical a create response so this API is idempotent when called with identical arguments:

nats s add BEN --config --trace
11:32:25 >>> $JS.API.STREAM.CREATE.BEN
{"name":"BEN","subjects":["mon.*"],"retention":"limits","max_consumers":128,"max_msgs_per_subject":128,"max_msgs":1024,"max_bytes":100000,"max_age":0,"max_msg_size":1024,"storage":"file","discard":"old","num_replicas":1,"duplicate_window":120000000000,"sealed":false,"deny_delete":false,
"deny_purge":false,"allow_rollup_hdrs":false}

11:32:25 <<< $JS.API.STREAM.CREATE.BEN
{"type":"io.nats.jetstream.api.v1.stream_info_response","config":{"name":"BEN","subjects":["mon.*"],"retention":"limits","max_consumers":128,"max_msgs":1024,"max_bytes":100000,"max_age":0,"max_msgs_per_subject":128,"max_msg_size":1024,"discard":"old","storage":"file","num_replicas":1,"d
uplicate_window":120000000000,"sealed":false,"deny_delete":false,"deny_purge":false,"allow_rollup_hdrs":false},"created":"2021-11-01T11:31:29.590690386Z","state":{"messages":0,"bytes":0,"first_seq":0,"first_ts":"0001-01-01T00:00:00Z","last_seq":0,"last_ts":"0001-01-01T00:00:00Z","consum
er_count":0},"domain":"hub","cluster":{"name":"lon","leader":"n2-lon"}}

Note first trace we send the create request all good, but the response is a INFO response which is illegal.

This appears to be a effort to make stream create idempotent, but instead its a invalid response:

https://github.com/nats-io/nats-server/blob/530ea6a5c371e944fc3141a59dc7d3a69f2ab132/server/jetstream_cluster.go#L3580-L3588

In single server mode the response is a create response:

12:36:23 >>> $JS.API.STREAM.CREATE.BEN
{"name":"BEN","subjects":["mon.*"],"retention":"limits","max_consumers":128,"max_msgs_per_subject":128,"max_msgs":1024,"max_bytes":100000,"max_age":0,"max_msg_size":1024,"storage":"file","discard":"old","num_replicas":1,"duplicate_window":120000000000,"sealed":false,"deny_delete":false,"deny_purge":false,"allow_rollup_hdrs":false}

12:36:23 <<< $JS.API.STREAM.CREATE.BEN
{"type":"io.nats.jetstream.api.v1.stream_create_response","config":{"name":"BEN","subjects":["mon.*"],"retention":"limits","max_consumers":128,"max_msgs":1024,"max_bytes":100000,"max_age":0,"max_msgs_per_subject":128,"max_msg_size":1024,"discard":"old","storage":"file","num_replicas":1,"duplicate_window":120000000000},"created":"2021-11-01T11:36:18.494935319Z","state":{"messages":0,"bytes":0,"first_seq":0,"first_ts":"0001-01-01T00:00:00Z","last_seq":0,"last_ts":"0001-01-01T00:00:00Z","consumer_count":0},"did_create":true}

Introduced in https://github.com/nats-io/nats-server/commit/cfbc69b12c1a1b0708d623d50420179206f54b10

Tests for this:

diff --git a/server/jetstream_cluster_test.go b/server/jetstream_cluster_test.go
index cb5eeca6..e7c7c152 100644
--- a/server/jetstream_cluster_test.go
+++ b/server/jetstream_cluster_test.go
@@ -9017,6 +9017,9 @@ func addStream(t *testing.T, nc *nats.Conn, cfg *StreamConfig) *StreamInfo {
    var resp JSApiStreamCreateResponse
    err = json.Unmarshal(rmsg.Data, &resp)
    require_NoError(t, err)
+   if resp.Type != JSApiStreamCreateResponseType {
+       t.Fatalf("Invalid response type %s expected %s", resp.Type, JSApiStreamCreateResponseType)
+   }
    if resp.Error != nil {
        t.Fatalf("Unexpected error: %+v", resp.Error)
    }
@@ -9167,6 +9170,25 @@ func TestJetStreamRollupSubjectAndWatchers(t *testing.T) {
    expectUpdate("age", "50", 6)
 }

+func TestJetStreamClusteredStreamCreateIdempotent(t *testing.T) {
+   c := createJetStreamClusterExplicit(t, "JSC", 3)
+   defer c.shutdown()
+
+   nc, _ := jsClientConnect(t, c.randomServer())
+   defer nc.Close()
+
+   cfg := &StreamConfig{
+       Name:       "AUDIT",
+       Storage:    MemoryStorage,
+       Subjects:   []string{"foo"},
+       Replicas:   3,
+       DenyDelete: true,
+       DenyPurge:  true,
+   }
+   addStream(t, nc, cfg)
+   addStream(t, nc, cfg)
+}
+
 func TestJetStreamAppendOnly(t *testing.T) {
    c := createJetStreamClusterExplicit(t, "JSC", 3)
    defer c.shutdown()
ripienaar commented 2 years ago

reported by @bwerthmann