Open itatdcer opened 6 months ago
Using the option mp.messaging.incoming.$channel.batch=true, always result in duplicate records, multiple of max.poll.records configuration.
mp.messaging.incoming.$channel.batch=true
max.poll.records
In a processing "pipeline" like
`@Incoming("inbound-channel") @Outgoing("outbound-channel") public Multi<Message<byte[]>> process(Message<List<byte[]>> msg) {
return Multi.createFrom() .items(() -> { return service.apply(msg).stream() .map(result -> { msg.ack().toCompletableFuture().complete(null); if (result != null) { return result; } return null; }); }) .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) .onCompletion().ifEmpty().continueWith(() -> { msg.ack().toCompletableFuture().complete(null); return List.of(); }); }`
The output topic is always filled with the number of input messages plus n x "max.poll.records`".
@itatdcer, I recently debugged duplicates on the non batch message processing, let me know if you are still trying to troubleshoot, I can help you with that
Do you have a reproducer?
Using the option
mp.messaging.incoming.$channel.batch=true
, always result in duplicate records, multiple ofmax.poll.records
configuration.In a processing "pipeline" like
`@Incoming("inbound-channel") @Outgoing("outbound-channel") public Multi<Message<byte[]>> process(Message<List<byte[]>> msg) {
The output topic is always filled with the number of input messages plus n x "max.poll.records`".