confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
13 stars 332 forks source link

Invalid default value when union is present in Avro schema #175

Open NiamhGo opened 6 years ago

NiamhGo commented 6 years ago

I'm currently trying to add a topic to one of our s3 connectors, our avro schema at a simplified level looks something like:

record Outcome1 {}
record Outcome2 {}
record Outcome3 { string id; }

record Record1 {
    union {
       Outcome1,
       Outcome2,
       Outcome3
    }  outcome = {};
}

record RecordToConsume { 
  array<Record1> records;
}

We already have a few other different topics working fine, and so the only difference between their schemas and this one is that we have a union of objects where the default value is {}.

The stack trace of the exception we get when adding the topic to our configuration is:

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
 at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1503)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1386)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1266)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1375)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1386)
 at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1266)
 at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1020)
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:425)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:264)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
 at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:247)
 at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
 at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:127)
OneCricketeer commented 6 years ago

What type of object should {} be? A record?

What is the AVSC generated by that?