Closed raghavi92 closed 4 years ago
@raghavi92 In Debezium, we have to use KeyValueSchema, and inside the KeyValueSchema, key and value could be avro type.
Please reference KeyValueSchema.java for more details.
@tuteng Could we add some doc for how to config and use schema in debezium connector?
@jiazhai yes I went through the KeyValueSchema code. As mentioned in the actual issue, I have provided the "schemaType" as Avro in my source config yaml file. But still, the avro type exception is thrown in this line : https://github.com/apache/pulsar/blob/d3cb108590713735ef1b23ff61997bf68e0fd7f3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java#L62
I guess its because of this issue in Avro library itself.
@raghavi92 This need to config the key.converter
and value.converter
for the debezium connector. The default converter is Jason converter org.apache.kafka.connect.json.JsonConverter
.
@jiazhai , I tried that also. Looks like we are yet to support Avro converter ?? The avro converter that io.confluent uses their own schema registry. It cannot be used with Pulsar.
@raghavi92 , Seems need to include related jar into the pom.xml file of pulsar-debezium to support it.
@tuteng to have a try and help provide a doc of how to do it.
This involves convert Avro schema that stores in Kafka registry into Pulsar readable data. We will provide a fix for this issue.
Describe the bug Currently the Debezium postgres source produces KeyValueSchema to the topics. Support is required for Avro schema type.
To Reproduce Steps to reproduce the behavior:
bin/pulsar-admin sources localrun --source-config-file conf/debezium-postgres-source-config.yaml
Expected behavior The connector should start producing messages in avro schema. But I'm getting the following error:
19:15:12.353 [main] INFO org.apache.pulsar.functions.LocalRunner - RuntimeSpawner quit because of org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.pulsar.client.impl.schema.StructSchema.createAvroSchema(StructSchema.java:136) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.StructSchema.parseSchemaInfo(StructSchema.java:149) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.client.impl.schema.AvroSchema.of(AvroSchema.java:90) ~[pulsar-client-original-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:144) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:189) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:209) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_201] at org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:65) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.initializeSchema(PulsarSink.java:327) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.sink.PulsarSink.open(PulsarSink.java:255) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:787) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:213) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:244) ~[pulsar-functions-instance-2.5.0-SNAPSHOT.jar:2.5.0-SNAPSHOT] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroTypeException: Unknown type: K at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] ... 15 more Caused by: org.apache.avro.AvroTypeException: Unknown type: K at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:292) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:646) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:740) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:81) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:604) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) ~[org.apache.avro-avro-1.8.2.jar:1.8.2] ... 15 more
Desktop (please complete the following information):