oleksiyk / kafka

Apache Kafka 0.9 client for Node
MIT License
297 stars 85 forks source link

GroupConsumer startingOffset should be set to what? #160

Closed matthewchngshopback closed 7 years ago

matthewchngshopback commented 7 years ago

Say I have a topic with 10 partitions and have a total of about 3M messages and I want to start at around 2.5M, what do I set the startingOffset to?

Docs mentions something about the value being a time. I'm not quite sure how to set this. Getting error from Long.low() when trying with offset as the index (i.e. Number(2500000)). When I try to set the value as a time value (new Date).getTime(), the final starting either resets to 0 or sets itself to the end of the queue.

I also can't find any Kafka docs regarding a time based offset anywhere. Any help will be great. Thanks.

matthewchngshopback commented 7 years ago

Does anyone have any ideas or is startingOffset option not supported in GroupConsumer?

memelet commented 7 years ago

I'm running into similar issues. Even setting startingOffset to a fixed and valid offset, the group consumer never starts at that number.

oleksiyk commented 7 years ago

GroupConsumers always start and continue from a last committed offset. If there is no committed offset for topic/partition then it starts from startingOffset.

memelet commented 7 years ago

Ah, only if no committed offset. And yes indeed the docs state that clearly.

Must this be so? It would sure be nice to be able to use the group consumer yet start at a specified offset.

oleksiyk commented 7 years ago

Must this be so?

GroupConsumers are meant to be used in (almost) fully automatic way. Starting from committed offset is the natural way to do so. I can't imagine a real life use case with multiple group consumers running on a cluster (re)starting from a hard coded offsets. Looks like nightmare to manage. If I ever need to rollback and re-process some messages I think I would make a script that starts single GroupConsumer with empty handler, commits desired offsets to all topics and partitions and exits. Then I restart the consumers cluster. Not sure if there is a console tool bundled with Kafka that can commit offsets.

memelet commented 7 years ago

Interesting, the code I was just refactoring did exactly what you said -- created a dummy consumer, set the offsets, then created the real consumer. I thought that was some kind of mistake.

Our use case is this: We have a stream processor working over date from manufacturing date (IIoT if you will). Some of that data is sampled, but some is stateful. The processor need to be primed with the stateful data before going forward. So when the processor restarts it needs to go back just a bit in stream from where it left off to get the previously emitted stateful data. It can't just start where it left off because it will then be missing the non-sampled/stateful data.

All the stateful data is emitted to topic at fixed periods. So we would always like the consumer to go back to just that point and start from there.

But in this case, there is really no "group" of consumers -- just a single consumer processing a single topic in a single node app. But there are multiple are multiple partitions for the topic, so the high level consumer is much more convenient.

Does the above make sense?

oleksiyk commented 7 years ago

there is really no "group" of consumers -- just a single consumer

Why don't use SimpleConsumer then?

memelet commented 7 years ago

there is really no "group" of consumers -- just a single consumer

Why don't use SimpleConsumer then?

Unless I misunderstand, with simple I have to manage partitions and rebalancing manually, no?

oleksiyk commented 7 years ago

There is no rebalancing if there is a single consumer. You just subscribe to specified topics and partitions and receive data. Its just a perfect match for your scenario.

memelet commented 7 years ago

Bad wording. What I meant by rebalancing was if partitions are added to topic. Have not looked closely at the admin interface. I assume I must register for events like that and add any new partitions to the subscription.

oleksiyk commented 7 years ago

partitions are added to topic

Thats quite a rare case. And if you just want to follow all partitions you can just subscribe only with a topic name parameter and no-kafka will select all topic partitions. It means that if you add new topics you will just have to restart the consumer without any re-configuration.

memelet commented 7 years ago

Yes adding partitions is rare for us, but we do it enough for it be handled. Our consumer apps are stateful stream processors so restarting is always a last resort. But if there is no way to detect new partitions internally (not topics because we run one process per topic for node apps) we can have the ansible playbook ping it via an API.

I didn't open this issue, but I personally am quite satisfied to have it closed. Thanks much.