nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.43k stars 686 forks source link

Consume() timeout in between pulls #1703

Open brettinternet opened 3 weeks ago

brettinternet commented 3 weeks ago

Observed behavior

I'm encountering an issue with the consumer's Consume method where each incoming message is throttled to exactly 30 second waits in between messages.

I'm using a JetStream shared between two accounts. I use default server and client options. I am running an embedded server in Go, and I have a "central account" that publishes to the remote client which imports the service.

// Central account
ac.Exports.Add(
    &jwt.Export{
        Subject: "$JS.API.>",
        Type:    jwt.Service,
    },
    &jwt.Export{
        Subject: "$JS.ACK.>",
        Type:    jwt.Service,
    },
)

go func() {
    for {
        js.PublishAsync("my.subject", []byte("hello, I'm the central server account"))
        time.Sleep(5 * time.Second)
    }
}()
// Remote account
ac.Imports.Add(
    &jwt.Import{
        Name:    "server",
        Type:    jwt.Service,
        Account: serverAcctPubKey,
        Subject: "$JS.API.>",
    },
    &jwt.Import{
        Type:    jwt.Service,
        Account: serverAcctPubKey,
        Subject: "$JS.ACK.>", // not sure if this is necessary for single ack, but it appears to be otherwise messages are redelivered
    },
)

// ...

consumer, err := c.js.CreateOrUpdateConsumer(ctx, "mystream", jetstream.ConsumerConfig{
  Name:          "myconsumer",
  Durable:       "myconsumer",
  FilterSubject: "my.subject"
})

cons.Consume(func(m *nats.Msg) {
  fmt.Println(time.Now().Format(time.RFC3339))
    m.Ack()
})

While the consumer slowly processes them in 30 second intervals, it appears the messages are stuck in ack pending (info.NumAckPending == info.NumRedelivered) and go down with each consumer activity.

The only error I see in ConsumeErrHandler is nats: no heartbeat received. I'm not sure if this is related to the issue I'm facing. I'm using the latest version of the NATS server and the Go client.

This issue occurs except when I add the option jetstream.PullMaxMessages(1) to the Consume() function.

I also have no issues with the alternative method to pull messages:

go func() {
  for {
    msg, err := consumer.Next()
    if err != nil {
      fmt.Println("err", err)
    } else {
      fmt.Println("msg", string(msg.Data()))
      msg.Ack()
    }
  }
}()

Could there be something wrong with consumer.Consume()?

Expected behavior

I'm expecting the callback in consumer.Consume to be called as soon as each message is published by the other account.

Server and client version

github.com/nats-io/jwt/v2 v2.5.8
github.com/nats-io/nats-server/v2 v2.10.18
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nkeys v0.4.7

Host environment

Darwin 23.6.0 arm64

Steps to reproduce

No response

Jarema commented 3 weeks ago

Hey.

I think you are missing the _INBOX.> import, as that is where the client subscribes for messages. You can customize it with CustomInboxPrefix().

Example of custom inboxes: https://natsbyexample.com/examples/auth/private-inbox/cli

brettinternet commented 3 weeks ago

Hi @Jarema, thank you for your reply.

I'm struggling to find an example in the test files for nats-server or nats.go that demonstrates this. I'm not seeing _INBOX.> imported anywhere. As for permissions, I'm running into this issue even when I provide blanket permissions to everything in both accounts in my example above:

accountClaims.DefaultPermissions.Pub.Allow.Add(">")
accountClaims.DefaultPermissions.Sub.Allow.Add(">")
brettinternet commented 3 weeks ago

I've created a reproduction of the issue here: https://github.com/brettinternet/nats-consumer-account-jetstream-export

The first account exports jetstream subjects: https://github.com/brettinternet/nats-consumer-account-jetstream-export/blob/f5bc7b7ff7a7337e3912e5c9440c25b2f7003b39/setup.go#L60-L67

The second account imports them: https://github.com/brettinternet/nats-consumer-account-jetstream-export/blob/f5bc7b7ff7a7337e3912e5c9440c25b2f7003b39/setup.go#L81-L92

The consumer does reach every published message, but only in 30 second intervals: https://github.com/brettinternet/nats-consumer-account-jetstream-export/blob/f5bc7b7ff7a7337e3912e5c9440c25b2f7003b39/main.go#L104-L113

I've added the stdout to the README: https://github.com/brettinternet/nats-consumer-account-jetstream-export/blob/761d21a116552b9a20b8421d0e75ab5b2c89face/README.md