Allow manual offsets commit for a consumer #304

piavka commented 7 years ago

Currently library consumer offests commit is automatic according to configuration or then each_batch block is processed. We need to be able to manually commit offsets but @offset_manager is not exposed by the Consumer class nor there is option to disable the automatic offset commits.

I can work on PR if such feature would be accepted ?

dasch commented 7 years ago

What is your use case – why are the automatic commits not sufficient?

piavka commented 7 years ago

I have multiple, for example we batch data from kafka into files which are every defined period of time closed and moved for uploads to s3, only once files are moved for uploads offsets are commited.

dasch commented 7 years ago

I would recommend trying to align the files with fetched batches and using the batch API, e.g.

consumer.each_batch do |batch|
  filename = "files/#{batch.topic}-#{batch.partition}-#{batch.first_offset}"

  File.open(filename, "w") do |file|
    batch.messages.each do |message|
      file << message.value

  # Once this block succeeds, the entire batch is marked as consumed.

You can specify min_bytes and max_wait_time to tune the batch sizes. I would recommend starting there rather than implementing your own solution.

piavka commented 7 years ago

I can't really align with batch size since we are taking about minutes intervals and during this time period the data needs to be processed before it written to the file and file finally closed and moved to uploads. This is something i've been using mannual commit offsets all the time i just need to migrate from poseidon_cluster to kafka_cluster gem. I don't see what is wrong with giving manual commit offset control to library user? The change itself seems to be minimal to support this with current code.

dasch commented 7 years ago

I'm simply trying to keep non-core features out – maintaining features is a lot more work than adding them.

What kind of API are you proposing?

dimas commented 7 years ago

I would say that at least NOT committing offsets automatically is a core feature. At least Java Kafka client allows you doing that. I cannot see a way of doing it with ruby-kafka. Even if I set offset_commit_interval=>0, offset_commit_threshold=>0 to avoid automatic commits, still, the consumer loop will invoke

      # In order to quickly have the consumer group re-balance itself, it's
      # important that members explicitly tell Kafka when they're leaving.
      @offset_manager.commit_offsets rescue nil

committing offsets and there seem to be no way of overriding that without some dirty hacks.

And use case - is consuming __consumer_offsets topic itself. You always want to consume it in full from the very beginning and you do not need to track your progress really so no point in committing at all.

Should I raise it as a separate issue?

dasch commented 7 years ago

@dimas committing offsets is a requirement for the distributed consumer groups to be able to function – otherwise any hiccup in the group would cause partition processing to start over.

Do you need to distribute your workload? If not, you can simply use the non-distributed consumer API:

kafka.each_message(topic: "__consumer_offsets") do |message|
  puts message.offset, message.key, message.value
piavka commented 7 years ago

what if i need to commit offsets only after some processing has happened - a processing of big data chunk, a chunk that cannot fit into a single kafka.each_message(...) {|m| ...} iteration ?

dasch commented 7 years ago

@piavka do you have an API proposal for ruby-kafka for dealing with manual offset commits?

piavka commented 7 years ago

@dasch simply add auto_commit_offset param to consumer.each_batch(...) that would stop calling @offset_manager.commit_offsets on each iteration completion in case it's set to false and expose commit_offsets method in the client within each each_batch iteration we have the offsets and thus user can save them, once the relevant processing (outside of each_batch(...)) is done user would commit those saved offsets

dasch commented 7 years ago

Hmm. I'll think a bit about it.

dimas commented 7 years ago

@dasch, oh. You are right, that is probably what I should use. My bad. No I do not need distributing workload.

I think I have kind of weird (or lets say special) use case - trying to do something similar to KafkaOffsetMonitor but in Ruby so my client needs some bits that normal clients should not care about. So I raised another one about that #311, hope you do not mind.

sidbits commented 7 years ago

We have Samza tasks which reads messages from Kafka Output stream but if there is any retryable failure while processing the message then i would want my Samza task to read the same message again and reprocess it. And after successfully processing the message acknowledge it for checkpointing instead of auto commit.

Is there a way to manually control the checkpoint(just like what Kafka Consumer provides "Manual Offset Control" by setting enable.auto.commit to false : https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html )

I came across this doc https://samza.apache.org/learn/documentation/0.13/jobs/reprocessing.html which talks about reprocessing previously processed data but it is not offering any acknowledgement based checkpoint control.

dasch commented 7 years ago

@sidbits I think you're in the wrong repo – this has nothing to do with Samza :)

dasch commented 7 years ago

