Closed StephenLReed closed 5 years ago
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
For a single partition, Idempotent producer sends remove the possibility of duplicate messages due to producer or broker errors. To turn on this feature and get exactly-once semantics per partition—meaning no duplicates, no data loss, and in-order semantics—configure your producer to set “enable.idempotence=true”.
From stackoverflow https://stackoverflow.com/questions/39574328/kafka-multiple-partition-ordering?rq=1
I'm not using Kafka streams - but it is possible to do this with the normal Consumer.
First sort the partitions - this assumes you've already seeked to the offset in each you want or used Consumer Group to do it.
private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {
Set<TopicPartition> pollPartitions = events.partitions();
List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
for (TopicPartition tp : pollPartitions) {
orderEvents.add(events.records(tp));
}
// order the list by the first event, each list is ordered internally also
orderEvents.sort(new PartitionEventListComparator());
return orderEvents;
}
/**
Used to sort the topic partition event lists so we get them in order */ private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {
@Override public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) { long c1 = list1.get(0).timestamp(); long c2 = list2.get(0).timestamp(); if (c1 < c2) { return -1; } else if (c1 > c2) { return 1; }
return 0;
}
} Then just round robin the partitions to get the events in order - in practice I've found this to work.
ConsumerRecords<String, String> events = consumer.poll(500);
int totalEvents = events.count();
log.debug("Polling topic - recieved " + totalEvents + " events");
if (totalEvents == 0) {
break; // no more events
}
List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);
int cnt = 0;
// Each list is removed when it is no longer needed
while (!orderEvents.isEmpty() && sent < max) {
for (int j = 0; j < orderEvents.size(); j++) {
List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
// The list contains no more events, or none in our time range, remove it
if (subList.size() < cnt + 1) {
orderEvents.remove(j);
log.debug("exhausted partition - removed");
j--;
continue;
}
ConsumerRecord<String, String> event = subList.get(cnt);
cnt++
}
Updated benchmark to use 3 partitions.
Enforce exactly once semantics with a single consumer group and stateful offset management.