Open astubbs opened 2 years ago
Hi @astubbs
Thanks very much for the great work in this library!
I have a scenario that is exactly like the one you describe and I was thinking of using parallel-consumer to implement it.
My scenario :
My understanding (But it looks like I am wrong) is that when adding a batchSize, my user function (executing the SQL inserts) will be called with a list of records of the same key. So the parallel consumer shall already be distributing batches of messages of the same key to the different threads running user function?
But your message suggests that this is not the case. What did I get wrong here ?
Also, do you have an idea of a potential release date for 0.5.0.0 where this feature will be available ?
Thanks very much
Tim
Yes atm this doesn't happen. You /can/ get this with partition or unordered mode if you're lucky, but it's not guaranteed. If using KEY ordering mode, it's guaranteed NOT to happen currently.
0.5 is about to be released as is (won't have this feature).
I'll need to have a look to see how easy this will be to implement before giving an estimate - also depends on demand. But I think this is a useful feature to have.
Thanks for participating! 🤗
Thanks for the prompt answer! Excellent news about 0.5, is it a matter of hours or a few days ?
Just to confirm my understanding about the batching.
If I use the below (Key based ordering + batching) then I shall get 5 threads running my 'processSQLBatch' function with each 'payload' containing only messages of the same key?
ParallelStreamProcessor.createEosStreamProcessor(ParallelConsumerOptions.<String, String>builder()
.consumer(getKafkaConsumer())
.ordering(KEY)
.maxConcurrency(5)
.batchSize(1000)
.build());
parallelConsumer.poll(context -> {
List<String> payload = context.stream()
.map(this::preparePayload)
.collect(Collectors.toList());
// process the entire batch payload at once
processSQLBatch(payload);
});
Days. Was going to be this week, but Kafka summit and covid got in the way.
Regarding your question (good question) I’m not sure if you’re referring to the current behaviour, or the new behaviour described in this ticket.
The current behaviour with KEY ordering is that you are guaranteed that amount ALL batches in flight, will only have one message of a given key. But the batch may contain messages of different keys.
With this issue (proposed new OPTIONAL behaviour), with KEY ordering and a new option enabled, you would get what you describe. Except UP TO 1000 messages in a batch of the SAME key, IF that many messages with that KEY are available in the buffered data from the input topic.
Is that more clear? 🤗
Regarding 0.5 btw, at this point I'm not aware of anything substantial that is going to change from the existing released 0.5 snapshot.
Also regarding the new option that I'm referring to, I would really appreciate your feedback as to what do you think the API should look like.
For example - to represent this new feature, should it be a new ordering mode, or an option that you enable when using the key ordering mode. In which case wouldn't labelling this option while using a mode other than key, cause an error?
+1 This is common use case for my project too.
+1 This feature is really necessary
Currently the batches are just a subset of non batched processing. I.e.
Imagine a scenario where you have a topic where key is the account.