apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

Use Beam schema in ParquetIO #19066

Open kennknowles opened 2 years ago

kennknowles commented 2 years ago

It would be better if we eliminate the need for avro.schema (infer it?/obtain it from PCollection?) and use org.apache.beam.sdk.schemas.Schema instead. 

Link to discussion on user@: https://lists.apache.org/thread.html/1d270884aa9e6d7952857

Imported from Jira BEAM-4812. Original Jira may contain additional context. Reported by: ŁukaszG.

psolomin commented 1 year ago

I would like to ask a follow-up question on this: ParquetIO being hard-coded to GenericRecord, seems to be practically unusable for logical types cause I did not find a coder for it which actually supports Avro logical types. Am I mistaken?

Related issue: https://github.com/apache/beam/issues/18874 Related PR which adds support for logical types: https://github.com/apache/beam/pull/26320

Here's an example of a pipeline which reads Avro and writes Parquet files:

... reading byte arrays from source ...

.apply("Parse payloads", ParDo.of(new ConsumedEventDeserializer()))  // byte[] -> GenericRecord
.setCoder(AvroCoder.of(GenericRecord.class, ConsumedEvent.SCHEMA$))
.apply(
        "Sink to S3",
        FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(ConsumedEvent.SCHEMA$)
                        .withCompressionCodec(CompressionCodecName.SNAPPY))
                .to(opts.getSinkLocation())
                .withNaming(new NoColonFileNaming(runId)));

But it produces an exception:

Caused by: java.lang.ClassCastException: class java.time.Instant cannot be cast to class java.lang.Number (java.time.Instant and java.lang.Number are in module java.base of loader 'bootstrap')
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:160)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:81)
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
    at org.apache.beam.sdk.extensions.avro.coders.AvroCoder.encode(AvroCoder.java:378)
    at org.apache.beam.sdk.coders.Coder.encode(Coder.java:132)
    at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:86)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
    at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
    at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
    at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
    at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
    at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
    at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
    at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
    at com.psolomin.consumer.ConsumedEventDeserializer.processElement(ConsumedEventDeserializer.java:34)

Schema:

{
  "namespace": "com.psolomin.records",
  "type": "record",
  "name": "ConsumedEvent",
  "fields": [
    {"name": "id", "type": "long"},
    {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}},
    {"name": "shard_id", "type": "string"}
  ]
}