vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
85 stars 82 forks source link

NullPointerException if kafka producer has partitions that have no leader #114

Open mouse256 opened 5 years ago

mouse256 commented 5 years ago

If kafka is in a unhealthy state and has partitions that currently don't have a leader, it will make the kafkaproducer crash with a NullPointerException. I believe those partitions should be ignored in that case.

Stacktrace that happens in that case: ERROR i.v.c.i.ContextImpl - Unhandled exception {} java.lang.NullPointerException: null at io.vertx.kafka.client.common.impl.Helper.from(Helper.java:87) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.kafka.client.producer.impl.KafkaProducerImpl.lambda$partitionsFor$7(KafkaProducerImpl.java:165) ~[vertx-kafka-client-3.5.2.jar:3.5.2] at io.vertx.core.impl.FutureImpl.setHandler(FutureImpl.java:79) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$null$0(ContextImpl.java:289) ~[vertx-core-3.5.3.jar:3.5.3] at io.vertx.core.impl.ContextImpl.lambda$wrapTask$2(ContextImpl.java:339) ~[vertx-core-3.5.3.jar:3.5.3] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) [netty-transport-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) [netty-common-4.1.19.Final.jar:4.1.19.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.19.Final.jar:4.1.19.Final]

ppatierno commented 5 years ago

Thanks for reporting, I'll look into this.

anhldbk commented 5 years ago

@ppatierno Following this post, I think we should have checks in the loop

for (org.apache.kafka.common.PartitionInfo kafkaPartitionInfo: done.result()) {

    PartitionInfo partitionInfo = new PartitionInfo();

    partitionInfo
    .setInSyncReplicas(
        Stream.of(kafkaPartitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList()))
    .setLeader(Helper.from(kafkaPartitionInfo.leader())) // -> WE MAY NEED TO CHECK IF THIS VALUE IS NULL
    .setPartition(kafkaPartitionInfo.partition())
    .setReplicas(
        Stream.of(kafkaPartitionInfo.replicas()).map(Helper::from).collect(Collectors.toList()))
    .setTopic(kafkaPartitionInfo.topic());

    partitions.add(partitionInfo);
}
    handler.handle(Future.succeededFuture(partitions));
} else {
    handler.handle(Future.failedFuture(done.cause()));
}

We also need to fix that logic in KafkaConsumerImpl.

Should I make a PR with the approach above?

barbarosalp commented 5 years ago

For my case done.result() throws NullPointerException when I call rxPartitions for a non-existing topic with auto.create.topics.enable=false

I think it should be caught and Future.failedFuture(ex.getCause()) should be called with the error.

b1zzu commented 2 years ago

Hi, I'm having the same issue and I also got to the conclusion that is the done.result() that returns null and therefore the loop throws a java.lang.NullPointerException and the handler never get notified of this error.

My traceback:

2022-02-15 14:14:11 ERROR [ContextImpl:] Unhandled exception
java.lang.NullPointerException: null
    at io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl.lambda$partitionsFor$8(KafkaConsumerImpl.java:466) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$null$1(KafkaReadStreamImpl.java:130) ~[vertx-kafka-client-4.2.4.jar:4.2.4]
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:100) ~[vertx-core-4.2.4.jar:4.2.4]
    at io.vertx.core.impl.AbstractContext.dispatch(AbstractContext.java:63) ~[vertx-core-4.2.4.jar:4.2.4]
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:38) ~[vertx-core-4.2.4.jar:4.2.4]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-common-4.1.73.Final.jar:4.1.73.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) [netty-common-4.1.73.Final.jar:4.1.73.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) [netty-transport-4.1.73.Final.jar:4.1.73.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.73.Final.jar:4.1.73.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.73.Final.jar:4.1.73.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.73.Final.jar:4.1.73.Final]
    at java.lang.Thread.run(Thread.java:829) [?:?]
vietj commented 2 years ago

ping @ppatierno

ppatierno commented 2 years ago

I have to find the time to finally take a look at this issue :-(

aesteve commented 1 year ago

Has anyone managed to reproduce this issue?

I tried a few things like producer.write(...) in a topic that does not exist together with auto.create.topics.enable set to false. All I get is a failed future (therefore the .result() is null, but that's expected) but no crash unfortunately.

I understand the fix proposed by @anhldbk (and it looks like the right one) and we could implement it safely but I'd like to reproduce the issue first.

In fact, if the infamous "metadata is not present after ..." error happens, it seems to be handled well.

And I can't find a way to have the Kafka standard client return a null List. It seems it'll either return an empty List or throw an exception, even if the cluster is in an unhealthy state.

Which lets me think that yes, we potentially could add a band-aid to check for nullity, but the problem might be buried a bit deeper.