streamnative / kop

Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
https://streamnative.io/docs/kop
Apache License 2.0
450 stars 136 forks source link

[BUG] Offset in send callback is not continuous when batching is enabled #332

Closed BewareMyPower closed 3 years ago

BewareMyPower commented 3 years ago

Describe the bug

296 implemented continuous offset, but in producer's send callback, the offset is not continuous when batching is enabled.

To Reproduce

Modify KafkaRequestHandlerTest#testProduceCallbck to following code and rerun.

    public void testProduceCallback() throws Exception {
        final String topic = "test-produce-callback";
        final int numMessages = 20;
        final String messagePrefix = "msg-";

        final Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getKafkaBrokerPort());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "100"); // avoid all messages being in the same batch

        @Cleanup
        KafkaProducer<Integer, String> producer = new KafkaProducer<>(props);

        Map<Integer, Long> indexToOffset = new ConcurrentHashMap<>();
        final CountDownLatch latch = new CountDownLatch(numMessages);
        for (int i = 0; i < numMessages; i++) {
            final int index = i;
            producer.send(new ProducerRecord<>(topic, i, messagePrefix + i), (recordMetadata, e) -> {
                if (e != null) {
                    log.error("Failed to send {}: {}", index, e);
                    fail("Failed to send " + index + ": " + e.getMessage());
                }
                assertEquals(recordMetadata.topic(), topic);
                assertEquals(recordMetadata.partition(), 0);
                indexToOffset.put(index, recordMetadata.offset());
                latch.countDown();
            }); // asynchronous send
        }
        latch.await();
        indexToOffset.forEach((index, offset) -> {
            log.info("{} => {} (delta = {})", index, offset, offset - index);
        });

The output:

1 => 10 (delta = 9)
2 => 9 (delta = 7)
3 => 10 (delta = 7)
4 => 9 (delta = 5)
5 => 10 (delta = 5)
6 => 9 (delta = 3)
7 => 10 (delta = 3)
8 => 9 (delta = 1)
9 => 10 (delta = 1)
10 => 19 (delta = 9)
11 => 20 (delta = 9)
12 => 19 (delta = 7)
13 => 20 (delta = 7)
14 => 19 (delta = 5)
15 => 20 (delta = 5)
16 => 19 (delta = 3)
17 => 20 (delta = 3)
18 => 19 (delta = 1)
19 => 20 (delta = 1)

We can see the offset is not continuous, even not monodic increasing. And the first offset is not zero.

However, if we use synchronous send, i.e. producer.send(/* ... */).get(), the first offset will be 0, and the offset will be continuous.

Expected behavior The offset in producer's send callback is continuous.

jiazhai commented 3 years ago

this is a little strange, how many entries(batched messages) are there in bk? it would be helpful to get the offset metadata from entry.

BewareMyPower commented 3 years ago

It looks like there are two bugs:

  1. The design of PendingProduceQueue has some problems, when I remove the queue and send synchronously, the offset will be continuous.

    diff --git a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
    index 57ca22e..c7f63f8 100644
    --- a/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
    +++ b/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java
    @@ -48,6 +48,7 @@ import java.util.Optional;
    import java.util.Set;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Collectors;
    @@ -602,10 +603,22 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
             String fullPartitionName = KopTopic.toString(topicPartition);
             PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName,
                     entryFormatter, records, executor);
    +            CountDownLatch latch = new CountDownLatch(1);
    +            pendingProduce.whenComplete(() -> {
    +                pendingProduce.publishMessages();
    +                latch.countDown();
    +            });
    +            try {
    +                latch.await();
    +            } catch (InterruptedException ignored) {
    +                // TODO:
    +            }
    +            /*
             PendingProduceQueue queue =
                     pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue());
             queue.add(pendingProduce);
             pendingProduce.whenComplete(queue::sendCompletedProduces);
    +             */
         }
    
         CompletableFuture.allOf(responsesFutures.values().toArray(new CompletableFuture<?>[responsesSize]))
  2. However, the base offset is still wrong. And I found that the first batch's base offset is N-1, where N is the number of messages in the batch.
BewareMyPower commented 3 years ago

For what I mentioned before, the second bug is easy to fix, but the first bug needs some refactor because it's caused by race condition.

Let

Assuming there're two messages/records 0 and 1, each record has only 1 message. Then we have

So there're two possible timelines (current offset is the offset of latest message, it's LEO - 1 in Kafka): Event interceptor's index (current offset) base offset
Persist[0] 0
Persist[1] 1
Complete[0] 1 1
Complete[1] 1 1

or

Event interceptor's index (current offset) base offset
Persist[0] 0
Complete[0] 0 0
Persist[1] 1
Complete[1] 1 1

We can see for the first case, the callback of message 0 retrieves the wrong current offset which belongs to message 1, because the interceptor has already updated twice.

BewareMyPower commented 3 years ago

Another fix from pulsar side is WIP, see https://github.com/apache/pulsar/pull/9257

BewareMyPower commented 3 years ago

The fix of Kafka client's send callback needs:

BewareMyPower commented 3 years ago

The KafkaRequestHandlerTest#testProduceCallback is flaky because this task is not completed but was closed accidentally before.

INFO  io.streamnative.pulsar.handlers.kop.KafkaRequestHandlerTest - Actual offsets: [5, 1, 2, 3, 4, 5, 6, 7, 8, 9]

From the error log, we can see the first message's offset is 5. It's caused by the race condition I mentioned before.