sclasen / akka-kafka

185 stars 62 forks source link

Implementing topic offset #17

Closed tnolet closed 9 years ago

tnolet commented 10 years ago

Hi,

I would love to be able to set the initial offset in time, i.e. reading from the starts or just reading new messages since the current moment. I'm implementing some Kafka readers that are not interested in any backed up messages, just the message that are arriving from the moment the readers attached. Any tips on how this could be done?

Thanks!

sclasen commented 10 years ago

Hi @tnolet

akka-kafka just uses the standard high-level consumer connector from the kafka distribution under the covers so you would have to do the same kind of thing as you would there.

three ways I can think of.

One: never commit offsets. I think if you do this, there wont be any offsets in zookeeper, and every time you restart, you will start at the largest offset. use CommitConfig(commitInterval = None, commitAfterMessageCount = None) when configuring akka-kafka, and dont commit manually, and you may be good.

Two: If approach one does not work, use a zookeeper client and delete the /<kafka Chroot If Any>/consumers/<your consumer id> znode before restarting your app, and there wont be any commits to start from, and the kafka consumer will start from the beginning.

Three: use a new consumer group id every time you restart. This is bad only because you will eventually want to go in zookeeper and clean up the old consumer znodes.

Let me know if approach one works for you please, we can add something to the docs if it does.

Cheers!

tnolet commented 10 years ago

Thanks for the response! But maybe I didn't express myself too well. I would like the Akka/Kafka connector to disregard any messages it hasn't committed yet but start at the top of the "message pile", i.e. start at the most current message. This basically is the kafka.api.OffsetRequest.LatestTime() in the Kafka API, I guess...I'm somewhat new to this.

sclasen commented 10 years ago

Hey @tnolet

I think we are on the same page, at least with approach 1, if I am correct about how things work.

If you never commit any offsets, there arent any offsets in zk, and by default when you restart, you will start from the newest message on each partition.

In your case you never need to commit messages right?

tnolet commented 10 years ago

@sclasen

I did some experimenting:

approach 1 doesn't work: When you set commit interval and message count to None, no messages get read, ever. Not new ones, not old ones. Approach 2, deleting the consumer group in ZK does work. It starts at the top and disregards any prior messages. But as you mentioned, I would have to clear out this zkNode at each start of the program. No biggie, but not very elegant. Any other ideas?

sclasen commented 10 years ago

Thanks for trying it out! I'll take a look at making approach 1 work correctly.

Is this the batch consumer or non batch? Now that I think about it the batch one won't work, but I think it's fixable

tnolet commented 10 years ago

@sclasen I'm not using batch consumers (yet). It would be great if we could just have a Boolean StartFromTop or something similar which overrides all the other commit settings in the config. A typical use case is with real time monitoring or some other checks on the data stream coming out of Kafka. In these cases you are probably not interested in the backlog, just what's happening from now on.

sclasen commented 10 years ago

@tnolet just hacked up this spec which uses CommitConfig(None,None) with the non-batch consumer and it appears to work.

https://github.com/sclasen/akka-kafka/blob/master/src/it/scala/com/sclasen/akka/kafka/NonCommitingConsumerSpec.scala

How does this compare to the code you were trying?

sclasen commented 9 years ago

@tnolet gonna close this one, feel free to reopen if this is still an issue

carneles commented 8 years ago

Hi @sclasen,

I found this post when I'm trying to find the answer of my problem. So basically, this is the situation I face: I have many akka-kafka implementation in the system, most of them, read the message from the beginning (set the "auto.offset.reset = smallest" in the configuration file). However, there is one implementation that should only read the latest message (ignore the old messages). Because AkkaConsumer class will always take "auto.offset.reset" from configuration file, I created an extender class named ConfigurableAkkaConsumer (-- extend from AkkaConsumer), override the kafkaConsumerProps() inside the class and injecting "auto.offset.reset=largest" configuration there. When I printed/debug the configuration of latest implementation, I found it already use "auto.offset.reset = largest" which is what I expected. However, when I run it, it still read old messages. Then I found this post, I tried to implement your suggestion above (suggestion no 1), but just like @tnolet, I cannot receive any message when trying to set CommitConfig(None, None).

Really hope you can give me some enlightenment.

sclasen commented 8 years ago

Have you tried creating a seperate actor system with a seperate config, then you could configure like is described here, https://github.com/sclasen/akka-kafka#configure, but passing a different conf file in.

How is your code different than the test here? https://github.com/sclasen/akka-kafka/blob/master/src/it/scala/com/sclasen/akka/kafka/NonCommitingConsumerSpec.scala

Never got feedback from tnolet on that, but it appears to work as expected in the test