rabbitmq / rabbitmq-stream-go-client

A client library for RabbitMQ streams
MIT License
169 stars 20 forks source link

More granular offset tracking needed for super stream consumer #322

Closed jsearles closed 4 months ago

jsearles commented 4 months ago

Is your feature request related to a problem? Please describe.

Right now using automatic offset tracking with super-streams, if there are remaining messages being delivered to a client of my application it is possible for the offset to be recorded ahead of where a client left off. This can happen because the client can close or lose connection while messages are still being delivered. This can cause the client to miss messages.

Describe the solution you'd like

I need more granularity in how the offsets are updated. Possible approaches I can think of are:

First option seems more desirable to me, though there would definitely be complexity with managing the offsets for multiple partitions.

Describe alternatives you've considered

This would be a lot simple with a single stream instead of a super-stream, though the ability to spread the load between multiple consumers is very desirable for us.

I also did look into writing my own super stream consumer managing the partitioned consumers myself, but the superStream field being private and unsetable on the SingleActiveConsumer struct prevents this. Exposing that would be another potential solution for my use case.

Additional context

No response

Gsantomaggio commented 4 months ago

Hi @jsearles , It is already possible to store offset for each partition https://github.com/rabbitmq/rabbitmq-stream-go-client/blob/main/examples/single_active_consumer/single_active_consumer.go#L52

See the full example: https://github.com/rabbitmq/rabbitmq-stream-go-client/tree/main/examples/single_active_consumer

jrsearles commented 4 months ago

Maybe I'm missing something but the super stream consumer is only set to use automatic offset tracking. It can get ahead of my consumer since the message was delivered to my application (but not necessarily my consumer). I can manually update the offset to a higher offset, but not a lower one.

I'll poke around with this and see if this works, but it seems like it'd be stepping on the automatic offset tracking and once that got ahead of my consumer then there'd no way for me to set it to a lower offset.

Gsantomaggio commented 4 months ago

@jsearles With this PR, https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/323, you can set a manual commit (the default). Now, it is only more explicit.

By setting the manual tracking, you can do what you like with the API: err := consumerContext.Consumer.StoreOffset() you can store the current offset.

with consumerContext.Consumer.StoreCustomOffset(xxx) you can store the offset you want.

Here you have everything you need to understand the partition:

    consumerContext.Consumer.GetName()
    consumerContext.Consumer.GetOffset()
    consumerContext.Consumer.GetStreamName()

It should be enough to handle the offset-tracking in all situations.

Here: https://github.com/rabbitmq/rabbitmq-stream-go-client/blob/main/examples/super_stream/super_stream_sac.go#L61. You can restart from the offset you decided.

jsearles commented 4 months ago

Perfect - thanks a lot!