exasol / kafka-connector-extension

Exasol Kafka Extension for accessing Apache Kafka
MIT License
4 stars 7 forks source link

Allow to use TIMESTAMP as output column type for Kafka message timestamp #50

Closed millin closed 3 years ago

millin commented 3 years ago

Adding the ability to use TIMESTAMP as the output column type for any field containing milliseconds Unix epoch timestamp (incl. message timestamp).

It was previously impossible to use TIMESTAMP as output column type

CREATE SCHEMA RND;

CREATE OR REPLACE TABLE RND.TEST_KAFKA_TIMESTAMP (
  DTTM TIMESTAMP,
  JSON VARCHAR(2000000),
  KAFKA_PARTITION DECIMAL(18, 0),
  KAFKA_OFFSET    DECIMAL(36, 0)
);

IMPORT INTO RND.TEST_KAFKA_TIMESTAMP (DTTM, JSON, KAFKA_PARTITION, KAFKA_OFFSET)
FROM SCRIPT KAFKA_EXTENSION.KAFKA_CONSUMER
WITH
 BOOTSTRAP_SERVERS = 'kafka-01:9092'
 TOPIC_NAME = 'test_topic'
 RECORD_FIELDS = 'timestamp,value'
 TABLE_NAME = 'RND.TEST_KAFKA_TIMESTAMP'
 RECORD_VALUE_FORMAT = 'string'
 GROUP_ID = 'exasol-kafka-test'
;

results in an error

[2021-08-25 16:21:15] [22002] VM error: F-UDF-CL-LIB-1127: F-UDF-CL-SL-JAVA-1002: F-UDF-CL-SL-JAVA-1013:
[2021-08-25 16:21:15] com.exasol.ExaUDFException: F-UDF-CL-SL-JAVA-1080: Exception during run
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.KafkaConnectorException: Error consuming Kafka topic 'test_topic' data. It occurs for partition '1' in node '0' and vm '139980029142784' Cause: E-UDF-CL-SL-JAVA-1109: emit column 'DTTM' is of type TIMESTAMP but data given have type java.lang.Long
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer.emit(KafkaRecordConsumer.scala:64)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.KafkaTopicDataImporter$.run(KafkaTopicDataImporter.scala:43)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.KafkaTopicDataImporter.run(KafkaTopicDataImporter.scala)
[2021-08-25 16:21:15] com.exasol.ExaWrapper.run(ExaWrapper.java:196)
[2021-08-25 16:21:15] Caused by: com.exasol.ExaDataTypeException: E-UDF-CL-SL-JAVA-1109: emit column 'DTTM' is of type TIMESTAMP but data given have type java.lang.Long
[2021-08-25 16:21:15] com.exasol.ExaIteratorImpl.emit(ExaIteratorImpl.java:124)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer.$anonfun$emitRecords$1(KafkaRecordConsumer.scala:110)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer.$anonfun$emitRecords$1$adapted(KafkaRecordConsumer.scala:101)
[2021-08-25 16:21:15] scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
[2021-08-25 16:21:15] scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
[2021-08-25 16:21:15] scala.collection.AbstractIterable.foreach(Iterable.scala:919)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer.emitRecords(KafkaRecordConsumer.scala:101)
[2021-08-25 16:21:15] com.exasol.cloudetl.kafka.consumer.KafkaRecordConsumer.emit(KafkaRecordConsumer.scala:52)
[2021-08-25 16:21:15]   ... 7 more
morazow commented 3 years ago

Hello @millin,

Thanks for the PR! It looks good!

morazow commented 3 years ago

Hey @baunz,

Thanks for reviewing, it is much appreciated!

morazow commented 3 years ago

Hello @millin,

I have refactored and added an integration test for your changes. Please have a look and feel free to change them (It needed updates to include Avro record producer).

Please also address @baunz suggestions, there are seem to some test failures (from first glance related to empty rows, out of bounds errors). Unfortunately, CI is not run for external pull requests. But you can run scripts/ci.sh locally before pushing, it is same as in CI excluding only Sonar checks.

Thanks for your contributions!

morazow commented 3 years ago

Fixes #51

millin commented 3 years ago

Hello, I have applied suggestions. I'm working on test errors now

millin commented 3 years ago

@morazow I had two options to correct the tests: ignore the absence of meta or mock it. I decided that it's better to mock. I also corrected new test for timestamp, since it did not pass for me due to different time zones.

Locally I have only one failed test run throws if it cannot create KafkaConsumer, but looks like it doesn't fail due to my changes:

[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration 'schema.registry.url' was supplied but isn't a known config.
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 6.2.0-ccs
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 1a5755cf9401c84f
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1630394054721
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-5, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Connection to node -1 (kafka01.internal/127.0.0.1:9092) could not be established. Broker may not be available.
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-5, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Bootstrap broker kafka01.internal:9092 (id: -1 rack: null) disconnected
...
** HERE ARE MANY IDENTICAL LINES AS ABOVE AND BELOW **
...
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-5, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Connection to node -1 (kafka01.internal/127.0.0.1:9092) could not be established. Broker may not be available.
[pool-1-thread-1-ScalaTest-running-KafkaTopicMetadataReaderIT] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-EXASOL_KAFKA_UDFS_CONSUMERS-5, groupId=EXASOL_KAFKA_UDFS_CONSUMERS] Bootstrap broker kafka01.internal:9092 (id: -1 rack: null) disconnected
[info] - run throws if it cannot create KafkaConsumer *** FAILED ***
[info]   Expected exception com.exasol.cloudetl.kafka.KafkaConnectorException to be thrown, but org.apache.kafka.common.errors.TimeoutException was thrown (KafkaTopicMetadataReaderIT.scala:70)
morazow commented 3 years ago

Hey @millin,

Thanks for the changes! I am going to check it soon.

millin commented 3 years ago

Hello @morazow,

Thanks for the fix, code looks better :+1: Let me know if I need to fix anything else.