trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
9.95k stars 2.87k forks source link

Kafka Connector INSERTs fail if no key provided in table definition #5788

Open hashhar opened 3 years ago

hashhar commented 3 years ago

Creating a table definition without key.dataFormat causes INSERTs to fail in the Kafka Connector.

This can be observed with the following test case (place into TestKafkaAvroSmokeTest)

    @Test(groups = {KAFKA, PROFILE_SPECIFIC_TESTS})
    @Requires(AllDataTypesAvroTable.class)
    public void testInsertPrimitiveDataType()
    {
        assertThat(query(format(
                "insert into %s.%s values " +
                        "('foobar', 127, 234.567, true)",
                KAFKA_CATALOG,
                ALL_DATATYPES_AVRO_TABLE_NAME
        ))).updatedRowsCountIsEqualTo(1);

        assertThat(query(format(
                "select * from %s.%s",
                KAFKA_CATALOG,
                ALL_DATATYPES_AVRO_TABLE_NAME
        ))).containsOnly(
                row("foobar", 127, 234.567, true));
    }

Adding below to table definition is sufficient to make tests pass.

  "key": {
    "dataFormat": "raw",
    "fields": []
  }
}

This means that either the documentation is misleading (see https://prestosql.io/docs/current/connector/kafka.html#table-definition-files) or that our implementation doesn't match our expectation.

The offending code is at https://github.com/prestosql/presto/blob/a82b00ba9c64134c14f94e82069a793dc6aa8e33/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaPageSinkProvider.java#L80.

Stack trace of failure:

2020-11-03 14:24:43 INFO: FAILURE     /    io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType (Groups: profile_specific_tests, kafka) took 1.0 seconds
2020-11-03 14:24:43 SEVERE: Failure cause:
io.prestosql.tempto.query.QueryExecutionException: java.sql.SQLException: Query failed (#20201103_083943_00001_sbif4): unknown data format 'dummy'
    at io.prestosql.tempto.query.JdbcQueryExecutor.execute(JdbcQueryExecutor.java:114)
    at io.prestosql.tempto.query.JdbcQueryExecutor.executeQuery(JdbcQueryExecutor.java:82)
    at io.prestosql.tempto.query.QueryExecutor.query(QueryExecutor.java:57)
    at io.prestosql.tests.kafka.TestKafkaAvroWritesSmokeTest.testInsertPrimitiveDataType(TestKafkaAvroWritesSmokeTest.java:70)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:104)
    at org.testng.internal.Invoker.invokeMethod(Invoker.java:645)
    at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:851)
    at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1177)
    at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
    at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: Query failed (#20201103_083943_00001_sbif4): unknown data format 'dummy'
    at io.prestosql.jdbc.AbstractPrestoResultSet.resultsException(AbstractPrestoResultSet.java:1778)
    at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:218)
    at io.prestosql.jdbc.PrestoResultSet$ResultsPageIterator.computeNext(PrestoResultSet.java:178)
    at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:141)
    at io.prestosql.jdbc.$internal.guava.collect.AbstractIterator.hasNext(AbstractIterator.java:136)
    at java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811)
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:294)
    at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
    at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:300)
    at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
    at io.prestosql.jdbc.PrestoResultSet$AsyncIterator.lambda$new$0(PrestoResultSet.java:124)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    ... 3 more
Caused by: java.lang.IllegalArgumentException: unknown data format 'dummy'
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:217)
    at io.prestosql.plugin.kafka.encoder.DispatchingRowEncoderFactory.create(DispatchingRowEncoderFactory.java:40)
    at io.prestosql.plugin.kafka.KafkaPageSinkProvider.createPageSink(KafkaPageSinkProvider.java:78)
    at io.prestosql.plugin.base.classloader.ClassLoaderSafeConnectorPageSinkProvider.createPageSink(ClassLoaderSafeConnectorPageSinkProvider.java:53)
    at io.prestosql.split.PageSinkManager.createPageSink(PageSinkManager.java:61)
    at io.prestosql.operator.TableWriterOperator$TableWriterOperatorFactory.createPageSink(TableWriterOperator.java:122)
    at io.prestosql.operator.TableWriterOperator$TableWriterOperatorFactory.createOperator(TableWriterOperator.java:113)
    at io.prestosql.operator.DriverFactory.createDriver(DriverFactory.java:114)
    at io.prestosql.execution.SqlTaskExecution$DriverSplitRunnerFactory.createDriver(SqlTaskExecution.java:942)
    at io.prestosql.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1070)
    at io.prestosql.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:163)
    at io.prestosql.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:484)
    at io.prestosql.$gen.Presto_345_79_gc367874____20201103_083901_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

cc: @findepi

hashhar commented 3 years ago

The reason for the failure is that in the DecoderModule (presto-row-decoders) installs a DecoderFactory with the name "dummy". The EncoderModule doesn't do this.

Then in KafkaMetadata we assign "dummy" to dataFormat if it's missing at https://github.com/prestosql/presto/blob/master/presto-kafka/src/main/java/io/prestosql/plugin/kafka/KafkaMetadata.java#L105.

Now when the DispatchingRowEncoder tries to find a factory by the name "dummy" it can't find any and hence the precondition check fails at https://github.com/prestosql/presto/blob/master/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/DispatchingRowEncoderFactory.java#L40.

One option I see is to add a DUMMY row encoder too which writes null bytes. But null bytes can have a different meaning for each data format. (I've tried this and it works but it can lead to people being confused about why the data inserted is all NULLs).

Or even simpler and proper would be to just change the exception message in the precondition check at https://github.com/prestosql/presto/blob/master/presto-kafka/src/main/java/io/prestosql/plugin/kafka/encoder/DispatchingRowEncoderFactory.java#L40 and to move it earlier (ideally during connector startup).