nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
15.54k stars 1.39k forks source link

source stream unable to catch up if the source has duplicate msgids #4459

Open HeavyHorst opened 1 year ago

HeavyHorst commented 1 year ago

What version were you using?

v2.9.22-RC.2

What environment was the server running in?

Linux rene-hx90 6.3.13-2-MANJARO #1 SMP PREEMPT_DYNAMIC Sun Jul 16 16:48:53 UTC 2023 x86_64 GNU/Linux AMD Ryzen 9 5900HX with Radeon Graphics

Is this defect reproducible?

Create stream:

nats stream create xx ? Subjects xx ? Storage file ? Replication 1 ? Retention Policy Limits ? Discard Policy Old ? Stream Messages Limit -1 ? Per Subject Messages Limit -1 ? Total Stream Size -1 ? Message TTL -1 ? Max Message Size -1 ? Duplicate tracking time window 2m0s ? Allow message Roll-ups No ? Allow message deletion Yes ? Allow purging subjects or the entire stream Yes

Publish message 1: nats pub xx -H "Nats-Msg-Id:xx" 'one'

wait 2 minutes (or the Duplicate tracking time window)

Publish message 2 with the same msg id: nats pub xx -H "Nats-Msg-Id:xx" 'two'

create source stream: nats s add SOURCE --source xx

The nats server now starts printing:

[60498] 2023/08/31 16:03:57.540068 [WRN] Error processing inbound source "xx" for '$G' > 'SOURCE': msgid is duplicate
[60498] 2023/08/31 16:03:59.550840 [WRN] Error processing inbound source "xx" for '$G' > 'SOURCE': msgid is duplicate
[60498] 2023/08/31 16:04:01.562266 [WRN] Error processing inbound source "xx" for '$G' > 'SOURCE': msgid is duplicate

for 2 minutes (duration of the duplicate tracking time window).

The source stream will not catch up for this period of time:

nats s report
Obtaining Stream stats
╭─────────────────────────────────────────────────────────────────────────────────────────╮
│                                      Stream Report                                      │
├────────┬─────────┬───────────┬───────────┬──────────┬───────┬──────┬─────────┬──────────┤
│ Stream │ Storage │ Placement │ Consumers │ Messages │ Bytes │ Lost │ Deleted │ Replicas │
├────────┼─────────┼───────────┼───────────┼──────────┼───────┼──────┼─────────┼──────────┤
│ SOURCE │ File    │           │ 0         │ 1        │ 94 B  │ 0    │ 0       │          │
│ xx     │ File    │           │ 0         │ 3        │ 206 B │ 0    │ 0       │          │
╰────────┴─────────┴───────────┴───────────┴──────────┴───────┴──────┴─────────┴──────────╯

╭─────────────────────────────────────────────────────────────────────╮
│                         Replication Report                          │
├────────┬────────┬────────────┬───────────────┬────────┬─────┬───────┤
│ Stream │ Kind   │ API Prefix │ Source Stream │ Active │ Lag │ Error │
├────────┼────────┼────────────┼───────────────┼────────┼─────┼───────┤
│ SOURCE │ Source │            │ xx            │ 0.32s  │ 0   │       │
╰────────┴────────┴────────────┴───────────────┴────────┴─────┴───────╯

It could become impossible to source from a large stream with lot of duplicate message ids. Even if the duplicate tracking time window is set to 1s it could take a very long time to process the messages.

Given the capability you are leveraging, describe your expectation?

I expected the source stream to replicate all messages without waiting for the deduplication window.

Given the expectation, what is the defect you are observing?

I have a large stream with millions of messages. I created a source stream that gets a subset of these messages.

The nats-server prints: Error processing inbound source "xx" for '$G' > 'SOURCE': msgid is duplicate all the time and only processes ~2 messages every 2 minutes (deduplication window).

jnmoyne commented 1 year ago

What you are describing is the correct for the current behavior.

1: you can avoid the situation using a number of options: a: set the deduplication window on the sourcing stream (SOURCE in your example) to 0s: let the deduplication happen at the origin stream, whatever gets deduplicated (or not) in that stream ends up in the stream doing the sourcing, no delay simply 'what is in xx is in SOURCE' aa: increase the size of the deduplication window in the origin steam (xx in your example) ab: do deduplication through the 'infinite deduplication window' method rather than relying on the deduplication window (which is ultimately not perfect as it has a time limit for it's efficiency) see https://nats.io/blog/new-per-subject-discard-policy/ b: don't end up with messages with duplicated message ids in the first place through. If there are no duplicated messages recorded in the origin stream then the sourcing stream will never have to deal with sourcing duplicated messages that break its own deduplication window. ba: increase the size of the deduplication window on the origin stream xx bb: do the deduplication using the 'infinite deduplication window' method mentioned above

As for the behavior in itself, right now it's just trying over and over to source all of the messages in the origin stream and store them in the sourcing stream, since in your example 2 messages are duplicated it ends up continuing to try and source those messages.

Generalizing the use case which is that a stream that sources from another stream receives messages from that origin stream that are duplicated and caught in the sourcing stream by it's deduplication window should it:

1: do what it does right now which is to retry until the second (duplicated) message is not caught by the deduplication window? 2: do not store the message from the source (as its deemed a duplicate message) and just move on and do not retry to source that same duplicated message?

IMHO the stream doing the sourcing should not have a deduplication window set (i.e. set it to 0s) and let the deduplication happen at the origin stream level. Also IMHO I would always prefer using the 'infinite deduplication' over the time-based deduplication window because the time-based window is in the end always has a limited effect while the 'infinite deduplication' is well ... infinite.

HeavyHorst commented 1 year ago

IMHO the stream doing the sourcing should not have a deduplication window set (i.e. set it to 0s) and let the deduplication happen at the origin stream level.

Thank you for the answer.

I am pretty sure i tried this but the problem was the same. I will try again later.

Why does a source Stream uses the deduplication Windows to begin with? I expected it to just copy the Messages from the source. Waiting long enough (unless the Windows is indefinite) it copies all Messages anyway so its just slowing down artificially. If the window is indefinite it will never finish. None of this is what i expect as a user i think.

HeavyHorst commented 1 year ago

I tried setting the deduplication Window to 0s.

[rene@rene-hx90 ~]$ nats stream rm SOURCE
? Really delete Stream SOURCE Yes
[rene@rene-hx90 ~]$ nats s add SOURCE --source xx
? Storage file
? Replication 1
? Retention Policy Limits
? Discard Policy Old
? Stream Messages Limit -1
? Total Stream Size -1
? Message TTL -1
? Max Message Size -1
? Duplicate tracking time window 0s
? Allow message Roll-ups No
? Allow message deletion Yes
? Allow purging subjects or the entire stream Yes
? Adjust source "xx" start No
? xx Source Filter source by subject 
? Import "xx" from a different JetStream domain No
? Import "xx" from a different account No
Stream SOURCE was created

Information for Stream SOURCE created 2023-09-01 16:35:10

             Replicas: 1
              Storage: File

Options:

            Retention: Limits
     Acknowledgements: true
       Discard Policy: Old
     Duplicate Window: 2m0s
    Allows Msg Delete: true
         Allows Purge: true
       Allows Rollups: false

Limits:

     Maximum Messages: unlimited
  Maximum Per Subject: unlimited
        Maximum Bytes: unlimited
          Maximum Age: unlimited
 Maximum Message Size: unlimited
    Maximum Consumers: unlimited

Replication:

              Sources: xx

State:

             Messages: 0
                Bytes: 0 B
             FirstSeq: 0
              LastSeq: 0
     Active Consumers: 

The actual window ended up to be "2m0s" again. Not sure if that is a problem with the nats cli or the server.

HeavyHorst commented 1 year ago

In server/stream.go -> checkStreamCfg is this check:

    if cfg.Duplicates == 0 && cfg.Mirror == nil {
        maxWindow := StreamDefaultDuplicatesWindow
        if lim.Duplicates > 0 && maxWindow > lim.Duplicates {
            maxWindow = lim.Duplicates
        }
        if cfg.MaxAge != 0 && cfg.MaxAge < maxWindow {
            cfg.Duplicates = cfg.MaxAge
        } else {
            cfg.Duplicates = maxWindow
        }
    }

This will set the duplication window back to 2 minutes if set to 0 and we are not a mirror i guess.

HeavyHorst commented 1 year ago

I can confirm that when i change this code to:

    if cfg.Duplicates == 0 && cfg.Mirror == nil && cfg.Sources == nil {
        maxWindow := StreamDefaultDuplicatesWindow
        if lim.Duplicates > 0 && maxWindow > lim.Duplicates {
            maxWindow = lim.Duplicates
        }
        if cfg.MaxAge != 0 && cfg.MaxAge < maxWindow {
            cfg.Duplicates = cfg.MaxAge
        } else {
            cfg.Duplicates = maxWindow
        }
    }

i am able to set the duplication window to 0s. I am then able to source all messages successfully.

derekcollison commented 1 year ago

Suggest len(cfg.Sources) > 0

derekcollison commented 1 year ago

I think @jnmoyne is working on PR that we can include in 2.9.22

Thanks again for the report.

wallyqs commented 1 year ago

Thanks for raising the we will revisit the implementation approach for this one in the v2.10 release (revert PR: https://github.com/nats-io/nats-server/pull/4495)

jnmoyne commented 1 year ago

In the mean time a workaround is to set the deduplication window size to 100 ms (the minimum window)