vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 82 forks source link

NullPointerException when using consumer batchHandler #264

Open afloarea opened 5 months ago

afloarea commented 5 months ago

Version

4.5.7

Context

I saw there were exceptions thrown when using the kafka consumer with a batch handler.

Do you have a reproducer?


@Testcontainers
@ExtendWith(VertxExtension.class)
class KafkaBatchConsumerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaBatchConsumerTest.class);
    private static final int MESSAGE_COUNT = 3;

    @Container
    private KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @BeforeEach
    void prepare(Vertx vertx, VertxTestContext testContext) {
        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);

        IntStream.range(0, MESSAGE_COUNT)
                .mapToObj(index ->
                        KafkaProducerRecord.<String, String>create("test.topic", null, "Message " + index, 0))
                .map(producer::write)
                .collect(Collectors.collectingAndThen(Collectors.toList(), Future::all))
                .onComplete(testContext.succeedingThenComplete());
    }

    @Test
    void testBatchConsumer(Vertx vertx, VertxTestContext testContext) {

        Properties config = new Properties();
        config.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "test.group");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);

        consumer.batchHandler(kafkaRecords -> {
            LOG.info("Got {} records", kafkaRecords.size());
            IntStream.range(0, kafkaRecords.size()).mapToObj(kafkaRecords::recordAt).forEach(kafkaRecord -> LOG.info("Got record: {}", kafkaRecord));
            testContext.verify(() -> {
                Assertions.assertEquals(MESSAGE_COUNT, kafkaRecords.size());
                testContext.completeNow();
            });
        });

        consumer.subscribe("test.topic");

    }

}

The test passes and consuming works but in the logs I can see NullPointerExceptions, one for each message:

java.lang.NullPointerException: Cannot invoke "io.vertx.core.Handler.handle(Object)" because the return value of "io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.tracedHandler(io.vertx.core.Context, io.vertx.core.Handler)" is null
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$run$10(KafkaReadStreamImpl.java:240)
    at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:328)
    at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:166)
    at io.vertx.core.impl.ContextInternal.emit(ContextInternal.java:209)
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.run(KafkaReadStreamImpl.java:240)
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$schedule$8(KafkaReadStreamImpl.java:194)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
    at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:1583)

Steps to reproduce

Create a kafka consumer with a batch handler and subscribe to a topic.