trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.42k stars 3k forks source link

Kafka connector with confluent schema registry - java.lang.IllegalArgumentException: Multiple entries with same key #9989

Open tooptoop4 opened 2 years ago

tooptoop4 commented 2 years ago

My kafka topics have both a key schema definition and value schema definition in confluent schema registry.

Trino query 1 SQL:

select * FROM kafka.default."my-topicoracle.jason.blaha";

Trino query 1 result:

java.lang.IllegalArgumentException: Multiple entries with same key: FLEX_VALUE_SET_ID=KafkaColumnHandle{name=FLEX_VALUE_SET_ID, type=bigint, mapping=FLEX_VALUE_SET_ID, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=false} and FLEX_VALUE_SET_ID=KafkaColumnHandle{name=FLEX_VALUE_SET_ID, type=bigint, mapping=FLEX_VALUE_SET_ID, dataFormat=null, formatHint=null, keyCodec=true, hidden=false, internal=false}
    at com.google.common.collect.ImmutableMap.conflictException(ImmutableMap.java:210)
    at com.google.common.collect.ImmutableMap.checkNoConflict(ImmutableMap.java:204)
    at com.google.common.collect.RegularImmutableMap.checkNoConflictInKeyBucket(RegularImmutableMap.java:146)
    at com.google.common.collect.RegularImmutableMap.fromEntryArray(RegularImmutableMap.java:109)
    at com.google.common.collect.ImmutableMap$Builder.build(ImmutableMap.java:389)
    at io.trino.plugin.kafka.KafkaMetadata.getColumnHandles(KafkaMetadata.java:161)
    at io.trino.plugin.kafka.KafkaMetadata.lambda$getTableHandle$0(KafkaMetadata.java:98)
    at java.base/java.util.Optional.map(Unknown Source)
    at io.trino.plugin.kafka.KafkaMetadata.getTableHandle(KafkaMetadata.java:88)
    at io.trino.plugin.kafka.KafkaMetadata.getTableHandle(KafkaMetadata.java:57)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:209)
    at io.trino.metadata.MetadataManager.lambda$getTableHandle$3(MetadataManager.java:399)
    at java.base/java.util.Optional.flatMap(Unknown Source)
    at io.trino.metadata.MetadataManager.getTableHandle(MetadataManager.java:393)
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1549)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:1478)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitTable(StatementAnalyzer.java:367)
    at io.trino.sql.tree.Table.accept(Table.java:53)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:384)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.analyzeFrom(StatementAnalyzer.java:3331)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:2101)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuerySpecification(StatementAnalyzer.java:367)
    at io.trino.sql.tree.QuerySpecification.accept(QuerySpecification.java:155)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:384)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:394)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:1315)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.visitQuery(StatementAnalyzer.java:367)
    at io.trino.sql.tree.Query.accept(Query.java:107)
    at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
    at io.trino.sql.analyzer.StatementAnalyzer$Visitor.process(StatementAnalyzer.java:384)
    at io.trino.sql.analyzer.StatementAnalyzer.analyze(StatementAnalyzer.java:347)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:92)
    at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:84)
    at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:274)
    at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:193)
    at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:817)
    at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:132)
    at io.trino.$gen.Trino_364____20211118_104318_2.call(Unknown Source)
    at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
    at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69)
    at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Trino query 2 SQL:

    select * FROM kafka.information_schema.columns
where table_name = 'my-topicoracle.jason.blaha'

Trino query 2 result:

java.lang.IllegalArgumentException: Multiple entries with same key: FLEX_VALUE_SET_ID=KafkaColumnHandle{name=FLEX_VALUE_SET_ID, type=bigint, mapping=FLEX_VALUE_SET_ID, dataFormat=null, formatHint=null, keyCodec=false, hidden=false, internal=false} and FLEX_VALUE_SET_ID=KafkaColumnHandle{name=FLEX_VALUE_SET_ID, type=bigint, mapping=FLEX_VALUE_SET_ID, dataFormat=null, formatHint=null, keyCodec=true, hidden=false, internal=false}
    at com.google.common.collect.ImmutableMap.conflictException(ImmutableMap.java:210)
    at com.google.common.collect.ImmutableMap.checkNoConflict(ImmutableMap.java:204)
    at com.google.common.collect.RegularImmutableMap.checkNoConflictInKeyBucket(RegularImmutableMap.java:146)
    at com.google.common.collect.RegularImmutableMap.fromEntryArray(RegularImmutableMap.java:109)
    at com.google.common.collect.ImmutableMap$Builder.build(ImmutableMap.java:389)
    at io.trino.plugin.kafka.KafkaMetadata.getColumnHandles(KafkaMetadata.java:161)
    at io.trino.plugin.kafka.KafkaMetadata.lambda$getTableHandle$0(KafkaMetadata.java:98)
    at java.base/java.util.Optional.map(Unknown Source)
    at io.trino.plugin.kafka.KafkaMetadata.getTableHandle(KafkaMetadata.java:88)
    at io.trino.plugin.kafka.KafkaMetadata.getTableHandle(KafkaMetadata.java:57)
    at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.getTableHandle(ClassLoaderSafeConnectorMetadata.java:209)
    at io.trino.metadata.MetadataManager.lambda$getTableHandle$3(MetadataManager.java:399)
    at java.base/java.util.Optional.flatMap(Unknown Source)
    at io.trino.metadata.MetadataManager.getTableHandle(MetadataManager.java:393)
    at io.trino.metadata.MetadataManager.getRedirectionAwareTableHandle(MetadataManager.java:1549)
    at io.trino.connector.informationschema.InformationSchemaMetadata.lambda$calculatePrefixesWithTableName$15(InformationSchemaMetadata.java:371)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
    at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    at com.google.common.collect.CollectSpliterators$1WithCharacteristics.lambda$forEachRemaining$1(CollectSpliterators.java:67)
    at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Unknown Source)
    at com.google.common.collect.CollectSpliterators$1WithCharacteristics.forEachRemaining(CollectSpliterators.java:67)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline$11$1.accept(Unknown Source)
    at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
    at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
    at io.trino.connector.informationschema.InformationSchemaMetadata.calculatePrefixesWithTableName(InformationSchemaMetadata.java:384)
    at io.trino.connector.informationschema.InformationSchemaMetadata.getPrefixes(InformationSchemaMetadata.java:238)
    at io.trino.connector.informationschema.InformationSchemaMetadata.applyFilter(InformationSchemaMetadata.java:209)
    at io.trino.metadata.MetadataManager.applyFilter(MetadataManager.java:1782)
    at io.trino.sql.planner.iterative.rule.PushPredicateIntoTableScan.pushFilterIntoTableScan(PushPredicateIntoTableScan.java:241)
    at io.trino.sql.planner.iterative.rule.PushPredicateIntoTableScan.apply(PushPredicateIntoTableScan.java:112)
    at io.trino.sql.planner.iterative.rule.PushPredicateIntoTableScan.apply(PushPredicateIntoTableScan.java:76)
    at io.trino.sql.planner.iterative.IterativeOptimizer.transform(IterativeOptimizer.java:184)
    at io.trino.sql.planner.iterative.IterativeOptimizer.exploreNode(IterativeOptimizer.java:159)
    at io.trino.sql.planner.iterative.IterativeOptimizer.exploreGroup(IterativeOptimizer.java:124)
    at io.trino.sql.planner.iterative.IterativeOptimizer.exploreChildren(IterativeOptimizer.java:217)
    at io.trino.sql.planner.iterative.IterativeOptimizer.exploreGroup(IterativeOptimizer.java:126)
    at io.trino.sql.planner.iterative.IterativeOptimizer.optimize(IterativeOptimizer.java:109)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:224)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:209)
    at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:204)
    at io.trino.execution.SqlQueryExecution.doPlanQuery(SqlQueryExecution.java:495)
    at io.trino.execution.SqlQueryExecution.planQuery(SqlQueryExecution.java:475)
    at io.trino.execution.SqlQueryExecution.start(SqlQueryExecution.java:416)
    at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:237)
    at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution$7(LocalDispatchQuery.java:143)
    at io.trino.$gen.Trino_364____20211118_104318_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Trino query 3 SQL:

    select * FROM kafka.information_schema.columns
where table_name like '%my-topicoracle.jason.blaha'

Trino query 3 result: get expected rows back successfully

tooptoop4 commented 2 years ago

workaround fix that worked for me: delete these 2 blocks - https://github.com/trinodb/trino/blob/364/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java#L139-L146 https://github.com/trinodb/trino/blob/364/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaMetadata.java#L203-L210

YuriyGavrilov commented 8 months ago

+1