vert-x3 / vertx-kafka-client

Reactive Kafka Client for Vert.x
Apache License 2.0
84 stars 83 forks source link

Consumer went to infinite loop when SerializationException throws #220

Open cyhii opened 2 years ago

cyhii commented 2 years ago

Questions

I write a Kafka consumer to consume JSON messages, so I use JsonObjectDeserializer, my configuration is:

        val kafkaConfig: Map<String, String> = mapOf(
            "bootstrap.servers" to config.getString("kafka.bootstrap.servers"),
            "key.deserializer" to "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" to "io.vertx.kafka.client.serialization.JsonObjectDeserializer",
            "group.id" to "my_group",
            "auto.offset.reset" to "latest",
            "enable.auto.commit" to "true",
        )

but sometimes when a Non-JSON message produced in Kafka, this consumer went to infinite loop at the position of that message.

I read the source code and found these lines in io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.java:

    private void pollRecords(Handler<ConsumerRecords<K, V>> handler) {
      if(this.polling.compareAndSet(false, true)){
          this.worker.submit(() -> {
             boolean submitted = false;
             try {
                if (!this.closed.get()) {
                  try {
                    ConsumerRecords<K, V> records = this.consumer.poll(pollTimeout);
                    if (records != null && records.count() > 0) {
                      submitted = true; // sets false only when the iterator is overwritten
                      this.context.runOnContext(v -> {
                          this.polling.set(false);
                          handler.handle(records);
                      });
                    }
                  } catch (WakeupException ignore) {
                  } catch (Exception e) {
                    if (exceptionHandler != null) {
                      exceptionHandler.handle(e);
                    }
                  }
                }
             } finally {
                 if(!submitted){
                     this.context.runOnContext(v -> {
                         this.polling.set(false);
                         schedule(0);
                     });
                 }
             }
          });
      }
  }

It throws SerializationException when call this.consumer.poll(), and then it calls exceptionHandler in the catch block. The bugged record is not skipped, so in the next time it causes SerializationException again. I think that's the infinite loop.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

Maybe it should auto-skip the record and write some logs? or re-new a more make-sense Exception so caller can do something in the exceptionHandler?

Need help, thanks.

Version

4.2.6

vietj commented 2 years ago

can you provide a reproducer ?

cyhii commented 2 years ago

can you provide a reproducer ?

Sure, I've made a commit to my demo project to reproduce this issue

Steps:

  1. Start a Kafka server on localhost(follow the quickstart)
  2. Start the application above
  3. Try to send some non-JSON message, it happens.
$ bin/kafka-console-producer.sh --topic my-topic --bootstrap-server 127.0.0.1:9092
>send a plaintext message

Uncomment the code in exceptionHandler to let the log show...

    consumer.exceptionHandler {
      // WARN: uncomment this line below then you will get log storm
      // log.warn("failed to consume message : {}", it.message, it)
    }

It will print tons of logs like this

12:14:16.099 [vert.x-kafka-consumer-thread-0] WARN  com.example.starter.MyKafkaConsumer - failed to consume message : Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition my-topic-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: io.vertx.core.json.DecodeException: Failed to decode:Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
    at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:129)
    at io.vertx.core.json.jackson.DatabindCodec.fromBuffer(DatabindCodec.java:99)
    at io.vertx.core.json.JsonObject.fromBuffer(JsonObject.java:948)
    at io.vertx.core.json.JsonObject.<init>(JsonObject.java:85)
    at io.vertx.core.buffer.impl.BufferImpl.toJsonObject(BufferImpl.java:110)
    at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:39)
    at io.vertx.kafka.client.serialization.JsonObjectDeserializer.deserialize(JsonObjectDeserializer.java:28)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1365)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:130)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1596)
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1283)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl.lambda$pollRecords$6(KafkaReadStreamImpl.java:154)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'send': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 6]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3588)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2683)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:865)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4664)
    at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:4484)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2730)
    at io.vertx.core.json.jackson.DatabindCodec.fromParser(DatabindCodec.java:126)
    ... 22 common frames omitted
vietj commented 2 years ago

@ppatierno can you have a look

ppatierno commented 2 years ago

Not sure something I can take a look right now, @cyhii any chance for a contribution I can review and help with?

aesteve commented 1 year ago

What you have here @cyhii is what is called a "Poison Pill" a corrupted (or just invalid) record that is blocking consumption.

This would also happen with the default Kafka consumer.

I think what we are missing here an equivalent of: ErrorHandlingDeserializer from Spring or DeserializationExceptionHandler from Kafka Streams.

Meaning: a class, or callback, or any mechanism we could configure to appropriate behaviour:

aesteve commented 1 year ago

Hello again @cyhii .

I was trying to give this issue a try to see how to improve error management and started with a test.

It may take a while before having a more elaborated design (as in Spring) but if you want to workaround the issue and deal with the poison pill, you can do this: https://github.com/aesteve/vertx-kafka-client/blob/handle-serialization-exceptions/src/test/java/io/vertx/kafka/client/tests/ConsumerTestBase.java#L1378-L1396

So, correcting myself:

but change the exception type to get more information (for instance the records offset so that a user can seek)

This is actually not necessary, only a cast to RecordDeserializationException is required (after an instanceof check). And this is the way the standard Kafka Consumer works, too.

And, There is no enough messages in the exception, so I cannot do seek or some other actions in the exceptionHandler.

With such a cast, you should be able to do seek where needed 🙂

Still, having more elaborated Exception handlers could be interesting, but with this you'd deal with the poison pill the same way you would with the standard Kafka consumer client.

Hope this helps.