karafka / rdkafka-ruby

Modern and performant Kafka client library for Ruby based on librdkafka
https://karafka.io
Other
346 stars 119 forks source link

Consumer defaults encourage bad design #112

Closed fritzblue closed 3 years ago

fritzblue commented 4 years ago

While the producer defaults and examples correctly encourage batching out of the gate, the current consumer defaults (and all readily accessible examples) encourage a non-batch design. These defaults and examples should be updated to support the 99% use case, which is batching.

Encouraging handling messages one at a time, even if the polling piece is batched under the hood, in turn encourages secondary side effects like writing to a database, writing to another kafka topic, etc to be performed one at a time rather than in batches, which would be a bad design choice for users of this library.

I propose rethinking the defaults / examples of the consumer to be batch-based, like those of the producer.

thijsc commented 4 years ago

I completely agree with you that batching is almost always preferred when consuming too.

We had some discussion about this in #99. I wrote up a batched example in this comment: https://github.com/appsignal/rdkafka-ruby/issues/99#issuecomment-553787214. I think it makes sense to put this example in the readme.

I still feel that the API and usage of enumerable, while not encouraging batching, do provide a very nice and Ruby-esque way of doing batching. Do you have some ideas on what a better API than this could be?

fritzblue commented 4 years ago

Fortunately there's quite a bit of "prior art" on this subject we can refer to. Here is an example from the official Java consumer:

     Properties props = new Properties();
     props.setProperty("bootstrap.servers", "localhost:9092");
     props.setProperty("group.id", "test");
     props.setProperty("enable.auto.commit", "false");
     props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }

Even the simple example above is lower level than the API one could imagine for your library though. Here's another example from the ruby-kafka gem:

# A mock search index that we'll be keeping up to date with new Kafka messages.
index = SearchIndex.new

consumer.subscribe("posts")

consumer.each_batch do |batch|
  puts "Received batch: #{batch.topic}/#{batch.partition}"

  transaction = index.transaction

  batch.messages.each do |message|
    # Let's assume that adding a document is idempotent.
    transaction.add(id: message.key, body: message.value)
  end

  # Once this method returns, the messages have been successfully written to the
  # search index. The consumer will only checkpoint a batch *after* the block
  # has completed without an exception.
  transaction.commit!
end

Here all the ceremony goes away and each_batch does all the heavy lifting.

I propose a higher level API than the first example, possibly as high level as the second example.

In addition to updating the API itself, documenting batch examples front-and-center (as with the examples above) would also go a long way in encouraging proper application design by users.

thijsc commented 4 years ago

It looks to me like the example you posted is almost the same as the one here: https://github.com/appsignal/rdkafka-ruby/issues/99#issuecomment-553787214

This works by leveraging enumerable, so really this API is already present on the consumer. Do you think there is a significant difference?

fritzblue commented 4 years ago

each_slice against your enumerable does not have the same behavior as each_batch in the library above. To get similar behavior, one would have to drop down to poll and write their own logic, similar to the Java example above. Users of this library would be encouraged toward better design by making batching the happy path.

thijsc commented 4 years ago

I struggle to see what the big difference is between https://github.com/appsignal/rdkafka-ruby/issues/99#issuecomment-553787214 and your example. I see that each_batch in your example groups messages by partition. Do you feel that's the essential difference? Or something else?

mgrosso commented 4 years ago

I know at least one user (cough) of the library who will want to do batching, but feel the need to limit latency. Many producers are bursty and if 1001 items are sent, the .each_slice(1000) example will create undesired behavior for the latency of the last item.

I totally get the reluctance you expressed in #99 to add your own polish on the librdkafka api. It might seem like a bit much to roll out a wrapper gem just to implement each_batch(max_items: 100, max_latency_ms: 250) but maybe it's the right thing to do. I don't think it would amount to much more than this though:

class MySuperConsumer < Rdkafka::Consumer
  def each_batch(max_items: 100, max_latency_ms: 250, &block)
    slice = []
    start_time = Time.new.to_f
    end_time = start_time + max_latency_ms / 1000.0
    loop do
      max_wait = end_time - Time.now.to_f
      max_wait_ms = if max_wait <= 0
                      0   
                    else
                      (max_wait * 1000).floor
                    end 
      message = poll_ms max_wait_ms
      slice << message if message
      if slice.size == max_items || Time.now.to_f >= end_time
        yield slice.dup
        slice = []
        start_time = Time.new.to_f
        end_time = start_time + max_latency_ms / 1000.0
      end 
      break if some_shutdown_condition?
    end 
  end 
end

This example doesn't group by partition. I might add a separate each_batch_by_partition(...). I can definitely see it as handy for cases where batching is only allowable or more sensible when done in some grouping that follows the grain of the partitions, eg: destination data store shard.

At 24 lines it may be longer than you'd like for a code example, although it also seems a bit small to justify a dependent gem. I'd also be happy to submit something like it as a PR if you change your mind about adding that directly.

thijsc commented 4 years ago

I'm happy to consider more complex behavior if there are important use cases that would be made possible. It's not clear to me yet if there are.

What use case(s) are you making possible with this helper?

thijsc commented 4 years ago

I just read your message a bit better and did some research.

The main missing feature you see is being able to batch with a time limit right? So either get a maximum number of messages or the ones that were received within the time limit?

Using Enumerable one could implement timed batching roughly like this:

slice_time = Time.now.to_f
consumer.slice_when { Time.now.to_f > slice_time + 5000 }.each do |messages|
  # Do something with messages and then reset slice time
  slice_time = Time.now.to_f
end

To give another example, something like this would enable partition grouping without any specific code added in this gem:

consumer.each_slice(1000) do |messages|
  messages.group_by { |m| m.partition }.each do |group|
    # Do something with the partitioned messages
  end
end

My message here is: We should embrace Enumerable instead of adding Enumerable like features to this gem.

That being said it might be very useful to have a hybrid slice on max items and time helper in the consumer. If we add that we should see if we can come up with a way to integrate it with the existing Enumerable setup.

mgrosso commented 4 years ago

I like your design principle of leaning on Enumerable as much as possible. I'll try to create a PR that implements the each_batch above, perhaps renamed each_slice_with_timeout? and I'll try to respect that design principle by avoiding any partition group since that seems reasonable to do with Enumerable.

The main thing that I don't expect to show up in ruby's Enumerable ever is any sort of timing/parallelism/or latency control, so the change should focus on that.

I've implemented something to the same requirement for work, but it's not the right approach for a PR here.

thijsc commented 4 years ago

Looking forward to your pull. I think this hybrid of waiting for a maximum number of messages with a time limit is a useful addition. I think each_batch is a pretty good name actually.

thijsc commented 3 years ago

Closed by merging #124