sclasen / akka-kafka

185 stars 62 forks source link

how do I configure the offset? #11

Closed xelax closed 10 years ago

xelax commented 10 years ago

How do I tell the consumer whether to start from the oldest available message or from the newest available offset?

sclasen commented 10 years ago

hi @xelax, so the default for kafka consumers for auto.offset.reset is largest , which is how the akka-kafka consumer behaves.

to override this behavior you can add this to your application.conf used to configure the actor system you use with your AkkaConsumer

kafka.consumer.auto.offset.reset = "smallest"
xelax commented 10 years ago

Thanks for the quick answer! I noticed that zookeeper it will remember the offset for my consumer group, so the setting operates only when there is no offset stored in zookeper for my consumer group. How do I "re-start", meaning, tell kafka to ignore the stored offset and start anew from the newest message? It would be useful, particularly for testing purpose.

sclasen commented 10 years ago

So the way kafka's console consumer works is that if you do --from-beginning it will just delete the consumer offsets from zookeeper before starting.

https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala#L181

So for testing you can do similar in a before method, like

val props:AkkaConsumerProps =  //your consumer props
ZkUtils.maybeDeletePath(props.zkConnect , "/consumers/" + props.group)
sclasen commented 10 years ago

also ZkUtils is kafka.utils.ZkUtils which should already be on your classpath if akka-kafka is