nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.41k stars 685 forks source link

jetstream.Consumer.Consume() does not respect ConsumerConfig #1588

Open wood-jp opened 5 months ago

wood-jp commented 5 months ago

Observed behavior

The Consume func allows for several options, including PullMaxMessages. However, PullMaxMessages defaults to 500 if not provided as an option here. At the same time, when creating a consumer in the first place, the config allows for setting MaxRequestBatch. If MaxRequestBatch is less than PullMaxMessages then the Consume func will always fail with error nats: Exceeded MaxRequestBatch of X. Additionally, this error is only exposed if passing the optional jetstream.ConsumeErrHandler to expose it - otherwise the consumer will silently fail to consume anything.

Expected behavior

  1. The Consume func should inherit default values (such as PullMaxMessages) from the consumer config, or otherwise not allow this situation to begin with.
  2. The Consume func should return a non-nil error if the options make consuming impossible (such as a consumer with MaxRequestBatch less than the set PullMaxMessages if that's still a option after fixing (1))

Server and client version

github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.33.1

Host environment

No response

Steps to reproduce

A full example requires some fiddling, but here's the short version:

consumer, _ := js.CreateOrUpdateConsumer(
    context.Background(),
    "streamname",
    jetstream.ConsumerConfig{
    MaxRequestBatch: 1,
    }),
)

cc, err := consumer.Consume(
    func(msg jetstream.Msg) {
       fmt.Println("handled a message")
    },
    // handle consumer errors
    jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, handlerErr error) {
       fmt.Println(handlerErr)
    }),
)
// err == nil here but should not be

Results in no handled messages, and outputs the error nats: Exceeded MaxRequestBatch of 1

piotrpio commented 5 months ago

Hey @wood-jp, thank's for reporting the issue. You're right, we should be taking consumer config into account on Consume. I think adjusting the default is reasonable, the only risk is that the consumer may be updated asynchronously between when you creating/fetchcing the consumer info and calling Consume (update may change e.g. MaxRequestBatch).