nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.44k stars 686 forks source link

Sequence number mismatch in NATS client after reconnection #1535

Open meldumm opened 7 months ago

meldumm commented 7 months ago

Observed behavior

After the NATS client reconnects, the sequence number does not match the expected sequence. The log shows a mismatch between the stored sequence number and the sequence number received from NATS. Here is an example from the logs:

2024/01/26 00:51:54 stored sequence: 34702749
2024/01/26 00:51:54 sequence from nats: 34702750

2024/01/26 00:51:54 NATS disconnected due to: read tcp 10.143.*->135.148.*: read: connection reset by peer, will try reconnecting
2024/01/26 00:51:54 NATS reconnected [nats://*.nats.backend:4222]
2024/01/26 00:51:55 stored sequence: 34702750
2024/01/26 00:51:55 sequence from nats: 34703172

Expected behavior

The expected behavior is that the sequence number should remain consistent and accurate after a reconnection. The NATS client should properly track and update the sequence number even after disconnections and reconnections

Server and client version

v1.32.0

Host environment

No response

Steps to reproduce

  1. Connect to NATS using the provided code snippet.
  2. Simulate a NATS disconnection (e.g., by restarting the NATS server or interrupting the network connection).
  3. Observe the sequence number before and after the reconnection.
func main() {
    nc, err := connectNats(&config)
    if err != nil {
        log.Printf("connectNats error: %s", err)
        return
    }

    defer nc.Drain()
    js, err := nc.Conn().JetStream()
    if err != nil {
        log.Printf("JetStream error: %s", err)
        return
    }

    startSeq := GetCurrentSequence(pgpool, config.From)

    cb := func(m *nats.Msg) {
        meta, _ := m.Metadata()
        log.Printf("stored sequence: %d", startSeq)
        log.Printf("sequence from nats: %d", meta.Sequence.Stream)

        startSeq = meta.Sequence.Stream

        fmt.Println()
    }

    _, err = js.Subscribe(
        config.NatsSubject,
        cb,
        nats.StartSequence(startSeq),
        nats.ReplayInstant(),
        nats.BindStream(config.NatsJetStream),
    )
    if err != nil {
        log.Printf("JetStream Subscribe error: %s", err)
        return
    }

    defer nc.Conn().Close()
    select {}
}
cesarvspr commented 7 months ago

what is config.From ? and what is GetCurrentSequence doing?

Jarema commented 7 months ago

Please post consumer and stream info.

However, that is just to confirm the situation you see: Server send the missing messages while the client was disconnected. Those missing messages will get redelivered as soon as their AckWait duration wil expire.