dashbitco / broadway_kafka

A Broadway connector for Kafka
233 stars 53 forks source link

Feature: Add option to set the starting offset for new consumer #119

Closed MitchellJeppson closed 2 years ago

MitchellJeppson commented 2 years ago

I have a use case where I would like to start up a kafka consumer that only processes new messages. Is there a config value that could be implemented to support this? I found begin_offset in brod. I'd be happy to take on the MR if I could get a little direction. Thanks!

josevalim commented 2 years ago

A PR to add support for such is indeed welcome.

yordis commented 2 years ago

Made a PR, not sure if it is the right spot

danmarcab commented 1 year ago

We have a similar use case, where we would like to always start from the :earliest offset. We've been testing the added begin_offset option and it doesn't seem to work. That option seems to be only used in :brod_consumers and BroadwayKafka is not using those (the produced is implementing the brod_group_member behaviour.

What do people think about adding a top-level config option:

@typep begin_offset :: :assigned | :reset | non_neg_integer()

Where:

Now the last one is a bit problematic, since offsets are partition specific, so one integer might not cut it, might be better to use a timestamp and resolve offsets using brod... it gets complex

It seems that the main use cases are either to begin from the start or from the end so we might not need that last option. To simplify things, we could just keep :reset and :assigned

offset_reset_policy begin_offset outcome
:earliest :reset always read from start
:latest :reset always read from end
:earliest :assigned read from committed offset, or from start if not defined or expired (current behaviour)
:latest :assigned read from committed offset, or from end if not defined or expired (current behaviour)

Happy to open a new issue and open a PR for this if it makes sense

josevalim commented 1 year ago

@yordis are you using your patch? Is it working as expected?

It seems that the main use cases are either to begin from the start or from the end so we might not need that last option. To simplify things, we could just keep :reset and :assigned

This sounds good to me. A PR is welcome :)

yordis commented 1 year ago

That was long ago, but I remember we tested back then. I'm sorry I can't help you any more.

josevalim commented 1 year ago

Np, thanks! following the code path, it seems the option indeed is not used, so we can revert it. :)