tisonkun / kafka-api

Generic abstractions for Kafka API.
https://crates.io/crates/kafka-api
Apache License 2.0
14 stars 2 forks source link

Unable to fetch with payloads larger than ~32K #10

Closed aovestdipaperino closed 4 weeks ago

aovestdipaperino commented 2 months ago

This is the Java repro. There is also a discrepancy for such large messages between the length of the buffer and the value that comes out of calculateSize

package org.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;

public class App {
    final static String SERVER = "localhost:3030";
    public static void createTopic() {
        // Kafka AdminClient configuration settings
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);

        // Create an AdminClient instance
        try (AdminClient adminClient = AdminClient.create(props)) {
            // Define the topic configuration
            String topicName = "debug";
            int partitions = 1;
            short replicationFactor = 1;

            // Create the topic
            NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
            System.out.println("Topic created successfully");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    static int message_count = 1;

    public static void sendMessage() throws InterruptedException{
        sendMessage(20);
    }

    public static void sendMessage(int size) throws InterruptedException {
        // Kafka producer configuration settings
        String topicName = "debug";
        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // Create a producer record
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "key", "z".repeat(size));

        // Send the record
        producer.send(record);

        Thread.sleep(200);

        // Close the producer
        producer.close();
    }

    public static void poll(int max) {
        // Kafka consumer configuration settings
        String topicName = "debug";
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
//        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"
//             //   +random.nextInt(1000)
//        );
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 3000);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        TopicPartition partition = new TopicPartition(topicName, 0);

        consumer.assign(Collections.singletonList(partition));
        // Subscribe to the topic
        //consumer.subscribe(Collections.singletonList(topicName));

        // Poll for new data
        try {
            int count = 0;
            while (count < max) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1500));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    count++;
                }
            }
        } finally {
            consumer.close();
        }
    }

    static void sendMessages() throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            sendMessage();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        createTopic();
        sendMessage(32692);
        poll(1);
        sendMessage(32693); // this will hang because the wire format is not correct and the client waits for more bytes.
        poll(1);
    }
}
aovestdipaperino commented 1 month ago

Found the problem:

impl Encoder<Option<&ReadOnlyRecords>> for NullableRecords {
    fn encode<B: Writable>(&self, buf: &mut B, value: Option<&ReadOnlyRecords>) -> io::Result<()> {
        match value {
            None => {
                if self.0 {
                    VarInt.encode(buf, 0)?
                } else {
                    Int16.encode(buf, -1)? // Is it correct to use INT16? Maybe old versions didn't support more than 64k records?
                }
            }
            Some(r) => {
                let len = r.size() as i16; // PROBLEM: this cast is problematic.
                if self.0 {
                    VarInt.encode(buf, len as i32 + 1)?;
                } else {
                    Int16.encode(buf, len)?;  // See above
                }
                buf.write_records(r)?;
            }
        }
        Ok(())
    }

Because the CalculateSize was doing the right thing, the buffer was exactly 2 bytes shorter (encoded len was 0, which turns into a single byte) and the poll on the client side never completed.

tisonkun commented 1 month ago

You are correct. Fixed bytes definition use Int32 for length.

This is fixed in https://github.com/tisonkun/morax/commit/4ee1579a3a817534720b03a56b3bed27033aad12#diff-e4f0d66a90a44626c385ba5dcf17619d3233b619f2ee9d5141754eda16b082a8 so kafka-api 0.3.1.

The source is https://kafka.apache.org/protocol.html#protocol_types

BYTES: Represents a raw sequence of bytes. First the length N is given as an INT32. Then N bytes follow.

NULLABLE_BYTES: Represents a raw sequence of bytes or null. For non-null values, first the length N is given as an INT32. Then N bytes follow. A null value is encoded with length of -1 and there are no following bytes.

tisonkun commented 4 weeks ago

Closed as the development has been moved to Morax under api/kafka-api.

Explained above.