nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.42k stars 685 forks source link

Failed to create OrderedConsumer #1679

Open niondir opened 1 month ago

niondir commented 1 month ago

Observed behavior

When Creating an OrderedConsumer an error is returend: nats: API error: code=400 err_code=10084 description=consumer in pull mode requires ack policy

Expected behavior

It should be possible to create an OrderedConsumer

Server and client version

Client Tested with: v1.34.0 and v1.36.0 Server: v2.10.12 and v2.10.18

Host environment

Windows amd64

Steps to reproduce

Stream:

streamCfg := jetstream.StreamConfig{
            Description: "uplinks from devices waiting to be parsed",
            Name:        "parse",
            Subjects:    []string{"test-subject.>"},
            Storage:     jetstream.FileStorage,
            Retention:   jetstream.WorkQueuePolicy,
        }

Consumer:

parseStream, err := js.Stream(context.Background(), "parse")

orderedCons, err := parseStream.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{
        FilterSubjects:    []string{"test-subject.>"},
        DeliverPolicy:     jetstream.DeliverAllPolicy,
        OptStartSeq:       0,
        OptStartTime:      nil,
        ReplayPolicy:      jetstream.ReplayInstantPolicy,
        InactiveThreshold: 0,
        HeadersOnly:       false,
        MaxResetAttempts:  0,
    })

// err = "consumer in pull mode requires ack policy"

Or is it not possible to use the "WorkQueuePolicy" but then the error message is at least missleading.

niondir commented 1 month ago

Due to the public Doc an "Ordered Consumer" is Push based.

Ordered consumers are the convenient default type of push consumers designed for applications that want to efficiently consume a stream for data inspection or analysis.

See: https://docs.nats.io/nats-concepts/jetstream/consumers

Due to the Source Documentation the OrderedConsumer I'm trying to use is Pull bases:

// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
// are managed by the library and provide a simple way to consume
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error)

See: https://github.com/nats-io/nats.go/blob/97e6a525e9d861338e89d31e81063a726a718941/jetstream/stream.go#L93

This is a little bit confusing and might be part of my issue to find the right solution for my problem.

piotrpio commented 1 month ago

Hello @niondir, thanks for reporting the issue. As to the first point, your assumption was correct, it is not possible to create a pull-based ordered consumer on a workqueue stream as ordered consumers have AckNone policy set by default.

I created a PR to nats-server to improve the error message: https://github.com/nats-io/nats-server/pull/5678

To your other point, I agree that the documentation is not up to date - it was true until the new pull-based JetStream API was introduced in the clients. In this new API we lean heavily on pull consumers, so the ordered consumer functionality is also implemented on top of a pull consumer. I'll bring it up and we'll improve the docs.

somratdutta commented 2 weeks ago

hi @piotrpio , I wanted to contribute to improved Nats Documentation for the new pull-based JetStream API, let me know if I can be of any help?

niondir commented 2 weeks ago

Okay, so it's correct that you support pull and push based ordered consumers now but both are AckNone. Makes sense.

I'm thinking a lot about implementing a pull based consumer with at least once (or even exactly once) semantics. But it's challenging.

At least once could work when the stream would re-publish already pulled but not yet acked messages on fetch and the consumer implements some deduplication and sorting based on message IDs for the batch. Since a consumer already pulls when 50% of the fetched messages are handled, it would also be beneficial to tell the stream to skip N messages during pull.

But I guess that needs some of the mentioned features on stream side.

jnmoyne commented 2 weeks ago

Ordered consumers have their own 'exactly once delivery' protocol that is implemented in the client library such that it delivers all of the messages without gaps or duplicates and in order (hence the name) to your message handler callback exactly once: meaning if your callback code can process the message, there is no way to signal that (e.g. using a negative or term or in-process acknowledgement) such that you can get the message re-delivered later. I.e. it does exactly once delivery of the messages, not exactly once processing.

That means that when using an ordered consumer you can't ack messages in any way but on the other hand that makes the ordered consumer considerably faster (higher throughput) than an explicitly asked consumer.

Work queue and interest streams are meant for 'consumption' (rather than merely reading) of the messages therefore you must have the consuming code explicitly ack each message therefore you can not (and would not want to) use an ordered consumer on a working queue or interest stream.

@somratdutta if you want to improve the documentation text inside nats.go (or the documentation web site text) you can always create a fork of the repo, commit your edits and create a PR.

niondir commented 1 week ago

So you basically I can not have guaranteed consumption (work queue) and guaranteed order (ordered consumer) at the same time?

But when I think about some critical systems you might need that - what's the solution then? Or do I miss something?