confluentinc / kafka-connect-storage-cloud

Kafka Connect suite of connectors for Cloud storage (Amazon S3)
Other
13 stars 332 forks source link

Schema Parse Exception, when sending schema with the kafka event with more than 1 array of objects for Parquet #502

Open rkravinderkumar05 opened 2 years ago

rkravinderkumar05 commented 2 years ago

I don't have the schema registry, so instead I send schema with the kafka event, to use field partitioner. If I use the Json writer, everything works fine. Even if I use the parquet write, it works fine, if I have just 1 array of objects field. When I have 2 arrays, and use Parquet writer, I get this error:


org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault

at org.apache.avro.Schema$Names.put(Schema.java:1513)

at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:784)

at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:945)

at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1104)

at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:973)

at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:957)

at org.apache.avro.Schema.toString(Schema.java:396)

at org.apache.avro.Schema.toString(Schema.java:382)

at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:137)

at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:277)

at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)

at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:80)

at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:494)

at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:274)

at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:219)

at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:188)

at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)

at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)

at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)

at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

[2022-03-27 22:07:20,381] ERROR WorkerSinkTask{id=moodagent_listens-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

java.lang.NullPointerException

at io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.close(ParquetRecordWriterProvider.java:97)

at io.confluent.connect.s3.TopicPartitionWriter.close(TopicPartitionWriter.java:313)

at io.confluent.connect.s3.S3SinkTask.close(S3SinkTask.java:229)

at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:397)

at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:591)

at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Kafka event :

{   
    "payload": {
        "id": "2002",
        "employees": [
                {
                    "ID": "abc",
                    "Position": "0",
                    "Name": "xyz"
                }
            ],
        "departments": [
            {
                "ID": "xyz",
                "Name": "abc"
            }
        ],
        "arrival_timestamp": "2022-02-24T12:10:05.761Z",
        "event_timestamp": "2022-02-23T07:29:11.547Z",
        "event_date": "2022-02-23"
    },
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "id"
            },
            {
                "type": "array",
                "field": "employees",
                "optional": false,
                "items": {
                    "type": "struct",
                    "fields": [
                        {
                            "type": "string",
                            "optional": false,
                            "field": "ID"
                        },
                        {
                            "type": "string",
                            "optional": false,
                            "field": "Position"
                        }
                        {
                            "type": "string",
                            "optional": false,
                            "field": "Name"
                        }
                    ]
                }
            },
            {
                    "field": "departments",
                    "items": {
                        "fields": [
                            {
                                "field": "ID",
                                "optional": false,
                                "type": "string"
                            },
                            {
                                "field": "Name",
                                "optional": false,
                                "type": "string"
                            }
                        ],
                        "type": "struct",
                        "optional": false
                    },
                    "optional": false,
                    "type": "array"
            },
            {
                "type": "string",
                "optional": false,
                "field": "arrival_timestamp"
            },
            {
                "type": "string",
                "optional": false,
                "field": "event_timestamp"
            },
            {
                "type": "string",
                "optional": false,
                "field": "event_date"
            }
        ],
        "optional": false,
        "name": "ksql.xyz"
    }
}

KAFKA S3 connector config

{
    "name": "test",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "1",
    "topics": "TEST_TOPIC",
    "s3.region": "eu-central-1",
    "s3.bucket.name": "test_bucket",
    "s3.part.size": "5242880",
    "parquet.codec": "snappy",
    "rotate.schedule.interval.ms": "360000",
    "flush.size": "500",
    "timezone" : "UTC",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
    "schema.compatibility": "NONE",
    "partition.field.name": "event_date",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "connect.meta.data": "false",
    "value.converter.connect.meta.data":"false"

}
rkravinderkumar05 commented 2 years ago

If I just change my kafka connect config from io.confluent.connect.s3.format.parquet.ParquetFormat to io.confluent.connect.s3.format.json.JsonFormat, the same thing works like charm, but for my use case, I need parquet.

cpastorini commented 1 year ago

@rkravinderkumar05 did you find any solution to this problem?

We are currently facing the same issue, We noticed that all the arrays are labeled with array from the parquet schema generated.

Using the parq :

image

(we just tested with a single array inside the schema, the name should be cpu_cpus)

Someone can help us to solve this issue? We are not able to understand where the array name is inserted (our idea is to use the field list name with array as prefix e.g. cpu_cpus_array).

Thanks!

cpastorini commented 1 year ago

Any news?

rkravinderkumar05 commented 1 year ago

I was working on it in for my previous employer, I don't really remember if I was able to fix it. But you could use JSON storage or start using schema registry as well, it makes life easy.

cpastorini commented 1 year ago

Thanks @rkravinderkumar05 for your response, we need to store data al Parquet files, so we need schema and we cannot use JSON instead of JSONSchema.

If you have some old code (or just some old notes) me and my team we can try to fix and to share with the community.

Let me know! Thanks