confluentinc / kafka-connect-hdfs

Kafka Connect HDFS connector
Other
12 stars 397 forks source link

Connector fails when using nested arrays in Avro schema #360

Open jurgispods opened 6 years ago

jurgispods commented 6 years ago

The HDFS connector (version 4.1) fails after the first batch of events when writing Avro messages to Parquet files in HDFS in case there are nested arrays in the Avro schema.

Relevant part of the offending schema:

{  
   "name":"Classifications",
   "type":{  
      "type":"array",
      "items":{  
         "type":"record",
         "name":"Classification",
         "fields":[  
            {  
               "name":"entry",
               "type":{  
                  "type":"array",
                  "items":{  
                     "type":"record",
                     "name":"ClassificationEntry",
                     "fields":[  
                        {  
                           "name":"key",
                           "type":"string"
                        },
                        {  
                           "name":"level",
                           "type":"int"
                        }
                     ]
                  }
               }
            }
         ]
      }
   }
}

Connector exception:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.SchemaParseException: Can't redefine: array
    at org.apache.avro.Schema$Names.put(Schema.java:1128)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690)
    at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)
    at org.apache.avro.Schema.toString(Schema.java:324)
    at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)
    at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:861)
    at org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333)
    at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172)
    at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
    at org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168)
    at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94)
    at org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66)
    at org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34)
    at org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:138)
    at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:130)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175)
    at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
    at io.confluent.connect.hdfs.parquet.ParquetFileReader.getSchema(ParquetFileReader.java:47)
    at io.confluent.connect.hdfs.parquet.ParquetFileReader.getSchema(ParquetFileReader.java:30)
    at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:344)
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    ... 10 more

Peeking into one of the parquet files written in the first batch (relevant part of parquet-tools meta <file>), reveals:

.Classifications:        REQUIRED F:1
..array:                 REPEATED F:1
...entry:                REQUIRED F:1
....array:               REPEATED F:2
.....key:                REQUIRED BINARY O:UTF8 R:2 D:2
.....level:              REQUIRED INT32 R:2 D:2

Apparently, the Avro schema was converted to a Parquet schema in such a way that two fields with the name array were created. This schema is subsequently offending org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility.

Possibly related: https://github.com/confluentinc/examples/issues/63

Can we do something about this - except preventing to use nested arrays, a fact we have limited influence on?

OneCricketeer commented 6 years ago

@pederpansen Can you share a minimal, complete schema along with a sample Producer code?

I feel this problem can be resolved by Avro namespaces on your records.

The error can happen because you have two type: array fields with the same name, I believe.

And the error is from Avro itself, not Connect

nickstatka777 commented 5 years ago

Hello @pederpansen, if this issue is actual for you please check the following https://docs.confluent.io/4.1.0/ksql/docs/installation/server-config/avro-schema.html There is the following useful information at the beginning of the article:

Avro schemas with nested fields are not supported yet. This is because KSQL does not yet support nested columns. This functionality is coming soon.

OneCricketeer commented 5 years ago

@nickstatka777

You might want to look at the latest docs

Avro schemas with nested fields are supported. In KSQL 5.0 and higher, you can read nested data, in Avro and JSON formats

However, the question is not about KSQL

nickstatka777 commented 5 years ago

Hello @cricket007, yep, but I've discovered this issue with 4.1 version. But thank you for info)

OneCricketeer commented 5 years ago

@nickstatka777 I see. You may want to consider upgrading then. The issue here isn't with accessing the nested data with KSQL, though. Are you sure the error you were getting was the exact same?