confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.16k stars 1.1k forks source link

Conversion of protobuf to Parquet fails for messages with Union sealed_value_optional #2317

Open omeraha opened 2 years ago

omeraha commented 2 years ago

Hi guys! I'm running an S3SinkConnector (Kafka connect version 5.5.7) and have some issues converting protobuf records to parquet. I have the following (example) Protobuf message:

import "scalapb/scalapb.proto";

message Msg {
  string str = 1;
  int32 int = 2;
  Type t = 3;
}

message Type {
   oneof sealed_value_optional {
     Type1 t1 = 1; 
     Type1 t2 = 2; 
     Type1 t3 = 3; 
  }
}

message Type1 {}
message Type2{}
message Type3 {}

I registered this message in the Schema register.

I'm running the connector with the following configurations (kept only the relevant fields, for readability purposes):

{
  "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy",
  ...
}

and it fails with the following error:

  Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional
Full stack trace
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Suppressed: java.lang.NullPointerException at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97) at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313) at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:249) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:401) at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:598) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202) ... 7 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: org.confluent.connect.protobuf.Union.sealed_value_optional at org.apache.avro.Schema$Names.put(Schema.java:1511) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1203) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955) at org.apache.avro.Schema.toString(Schema.java:396) at org.apache.avro.Schema.toString(Schema.java:382) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:277) at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:564) at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80) at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:501) at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274) at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546) ... 10 more

Is there a way to resolve this?

OneCricketeer commented 2 years ago

Maybe this config helps?

https://github.com/confluentinc/schema-registry/blob/master/protobuf-converter/src/main/java/io/confluent/connect/protobuf/ProtobufDataConfig.java#L27

Or if it's Avro

https://github.com/confluentinc/schema-registry/blob/master/avro-data/src/main/java/io/confluent/connect/avro/AvroDataConfig.java#L28-L32

But also, I'm not entirely familiar with Parquet supporting union types

omeraha commented 2 years ago

@OneCricketeer Tried adding this config, but it still fails for the same reason

OneCricketeer commented 2 years ago

You mention you're running 5.5.7... #2149 isn't part of that release

omeraha commented 2 years ago

Running now with the latest versions: kafka-connect-s3-10.0.8.jar and Kafka connect 6.0.7

OneCricketeer commented 2 years ago

It is running successfully? Or that's what you've upgraded to? You can look at the v6.0.7 branch of the ProtobufDataConfig class, and the property still isn't there. It was only recently added, and might not be released yet since it's also not in the v7.1.1 (latest release) branch