exasol / kafka-connector-extension

Exasol Kafka Extension for accessing Apache Kafka
MIT License
4 stars 7 forks source link

Error when trying to consume a topic with NULL values #29

Closed jwarlander closed 3 years ago

jwarlander commented 3 years ago

When running a local test in my dev schema after upgrading the Kafka Connector to 1.0.0, I ran into the following issue:

[main] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Fetch position FetchPosition{offset=117013, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>:<PORT> (id: 22 rack: null)], epoch=0}} is out of range for partition public.pro-seller.offer-0, resetting offset
[main] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Resetting offset for partition public.pro-seller.offer-0 to position FetchPosition{offset=139566, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[<IP>:<PORT> (id: 22 rack: null)], epoch=0}}.
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[main] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[main] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[main] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-EXASOL_KAFKA_UDFS_CONSUMERS-1 unregistered
F-UDF-CL-SL-JAVA-1080: Exception during run
com.exasol.cloudetl.kafka.KafkaConnectorException: Error consuming Kafka topic 'public.pro-seller.offer' data. It occurs for partition '0' in node '0' and vm '140681706548736' Cause: null
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:99)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter.run(KafkaTopicDataImporter.scala)
com.exasol.ExaWrapper.run(ExaWrapper.java:196)
Caused by: java.lang.NullPointerException
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.$anonfun$run$1(KafkaTopicDataImporter.scala:84)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.$anonfun$run$1$adapted(KafkaTopicDataImporter.scala:71)
scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
scala.collection.AbstractIterable.foreach(Iterable.scala:919)
com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:71)
    ... 6 more

W-UDF.CL.SL.JAVA-1082: Skipping init, because cleanup method cannot be found.

This seems to relate to the following line in KafkaTopicDataImporter.scala:

          val exasolRow: Seq[Any] = recordValue ++ metadata

Is there a possibility that the above construct will fail if the recordValue is null? This is something that quite frequently happens in the topic I'm reading here, as a way to indicate for a record to be deleted..

Previously, that particular line looked like:

          val exasolRow: Seq[Object] = getAvroRow(singleColumnJson, recordValue) ++ metadata

..where getAvroRow() was defined like:

  private def getAvroRow(singleJson: Boolean, recordValue: GenericData.Record): Seq[Object] =
    if (singleJson) {
      Seq(s"$recordValue")
    } else {
      AvroRow(recordValue).getValues().map(_.asInstanceOf[AnyRef])
    }

Apparently the null conversion would pass by just fine as part of the string interpolation above (I'm extracting value as JSON).

redcatbear commented 3 years ago

@jwarlander, thanks for reporting this issue. @morazow is available again next Monday and will look into the matter then.

jwarlander commented 3 years ago

Great, thanks @redcatbear! Sadly, I guess I'll have to roll back to using the previous version in the meantime -- I did have a look myself, but wasn't able to set up a proper test for pushing in a null value. Scala isn't a commonly used language for me unfortunately :sweat_smile:

jwarlander commented 3 years ago

As it turns out, I was able to implement what is at least a workaround.. see #31. This is only tested with our use case so far though, single column JSON extraction from Avro-encoded messages.

jwarlander commented 3 years ago

@morazow, happy for any feedback here.. especially around how to test this! :sweat_smile:

morazow commented 3 years ago

Hello @jwarlander,

Thanks for the feedback!

I am going to check your pull request, maybe we could have a new minor release with the fix.