confluentinc / parallel-consumer

Parallel Apache Kafka client wrapper with per message ACK, client side queueing, a simpler consumer/producer API with key concurrency and extendable non-blocking IO processing.
https://confluent.io/confluent-accelerators/#parallel-consumer
Apache License 2.0
84 stars 129 forks source link

Feature suggestion: Minimum batch size + batch max wait time #560

Open Ehud-Lev-Forter opened 1 year ago

Ehud-Lev-Forter commented 1 year ago

Hi Team, Does it make sense to support minimumBatchSize with maxTimeToWait in Parallel consumer? The idea is that I want to use the PC as a micro batch consumer, but since my consumers are faster than my stream I get 1 event most of the time, I do get larger batches but only when system is in lag. Seems that I am not the only one that consider that: see What do you think about this idea? Is this something you don't want to support? or it is just not the focus right now?

astubbs commented 1 year ago

Sounds like a worthy feature to have! :)

rkolesnev commented 1 year ago

I am working through the PR review on it - but i will wait until @astubbs has a look as well.

I am wondering on a concept a little bit though - what benefit is in enforcing min batch size - is it purely slight optimization for processing function / downstream resource use?

Generally batching is self balancing - if downstream processing is faster than consumption from kafka / flow rate - batching is not adding any advantage from purely processing speed, once flow rate exceeds time taken to process individual messages - batching starts to kick in. If processing explicitly depends on messages being batched - probably best bet to add small KStream / KSQL job on incoming topic to aggregate messages into batches (windowed etc).

From implementation side - is minimum batch size adding any value over just Linger ? similar to Kafka producer behaviour - i.e. keep current batch size and add linger (i.e. timeout for waiting to fill the batch)? i think that would make it slightly more familiar and a bit easier to reason about / slightly less complex implementation.

Ehud-Lev-Forter commented 1 year ago

Hi @rkolesnev can you elaborate how can I "keep current batch size and add linger (i.e. timeout for waiting to fill the batch)" I thought linger is only in the producer side?

As for best bet adding KStream / KSQL, if I understand correctly it means adding another hop to kafka which is more expensive.

I totally agree that adding min batch size makes the library more complex and might harm performance.

rkolesnev commented 1 year ago

@Ehud-Lev-Forter

a) The point on KStream / KSQL is about guarantees - the batching mechanism that exists in Parallel Consumer and this proposed change - are more about best effort optimisation. KStream / KSQL pre-processing to batch data give better control and more guarantees if downstream processing depends on data being batched - things like late arriving data, re-ordering etc would be better taken care of with KStreams / KSQL and windowing - but that is more flow architecture discussion.

b) For the Linger point - yes it is a concept in Kafka Producer and in Kafka Producer it controls data batching for Produce requests - which sounds similar to what this change is doing (granted its batching for processing function instead of produce).

What i am asking - would adding Linger (or in other words batch timeout) without adding minBatchSize be enough? What benefit adding minBatchSize gives over using just Batch size?

So on work acquisition does 2 checks 1) Enough to fill at least 1 batch or 2) Batching timeout (Linger) exceeded

c) I guess batching only makes sense for Unordered and Key ordered processing - for Partition ordering it would only make sense for high number of partitions processed by single PC instance - do you agree? Should check be added to warn if this behaviour is enabled for Partition ordered processing or check how many partitions are subscribed to against min batch size?

Ehud-Lev-Forter commented 1 year ago

Hi @rkolesnev, As for a, yes it is best effort, similar to some behavior of kafka connect using partition.duration.ms if I remember correctly.   As for b, It is only relevant when the parallel consumer is meant to write back to kafka, but in many cases IMO it is meant to write to something else, thus batching using producer is not an option.  As for c, I think most of the interesting use cases are for Key or unordered order. 

Just to give an example, we are using this library to write both to s3 and RDS and later sending notification back to kafka. We would like to have some kind of batching when writing s3. Now you may say that I can use Kafka connect instead with 2 writes, the problem with that is the latency and pricing. So the PC becomes very helpful, but I can not have a real batching if I want to. As for adding warning when using both min batch and Partition order, it is easy but the real questions are:

  1. Do you guys want to support this feature at all, regardless of the PR I shared? 
  2. Assuming you do, do you want it to be part of the main code, or some kind of if/else for a special use case with maybe a log of experimental feature?
rkolesnev commented 1 year ago

Hi @Ehud-Lev-Forter,

1) I think that is a useful feature - and it would be beneficial to have this option. 2) Part of main code is fine i think - especially as its controlled by a flag / settings and can be off by default.

I have left review comment on PR - general batching flow logic looks ok to me, but i would really like to see more tests there covering different usage modes - Transactional consume + produce, different ordering guarantees, non-reactor usage of PC - the change touches logic quite deep in the code and has a potential for edge case issues - the additional tests should help to surface them and establish regression protection going forward for that new batching feature as well.