apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

ClassCastException from GenericData$Record after upgrade to `org.apache.parquet:parquet-avro:1.14.1` #2975

Open NathanEckert opened 3 months ago

NathanEckert commented 3 months ago

Describe the bug, including details regarding any error messages, version, and platform.

When upgrading from 1.13.1 to 1.14.1:

java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to org.myorg.parquet.TestUpgradeParquet$CustomParquetRecord

I get a different error when upgrading only to 1.14.0:

java.lang.RuntimeException: shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by default

I manage to reproduce it with the following code:

package org.myorg.parquet;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.Cleanup;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificData.SchemaConstructable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/**
 * Test upgrade parquet.
 * <p>Works with org.apache.parquet:parquet-avro:1.13.1 (failure with simplified example, but different stacktrace than the other ones, as it fails later)</p>
 * <p>Fails with org.apache.parquet:parquet-avro:1.14.0 with shaded.parquet.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Java 8 optional type `java.util.Optional<java.lang.Long>` not supported by default: add Module ...</p>
 * <p>Fails with org.apache.parquet:parquet-avro:1.14.1 with java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.activeviam.source.parquet.api.IParquetRecord</p>
 */
class TestUpgradeParquet {

  private static final Path TEMP_FOLDER = Paths.get(System. getProperty("java.io.tmpdir"), "directoryForTest");
  private static final Path TEMP_FILE = Paths.get(TEMP_FOLDER.toString(), "source.parquet");

  private Path parquetFilePath;

  @BeforeEach
  public void createParquet() throws Exception {

    // Cleanup of previous test run
    Files.list(TEMP_FOLDER).forEach(path -> {
      try {
        Files.delete(path);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    });
    Files.deleteIfExists(TEMP_FOLDER);
    Thread.sleep(Duration.ofSeconds(1));

    // Actually create the folder and file for this test
    Files.createDirectory(TEMP_FOLDER);
    this.parquetFilePath = Files.createFile(TEMP_FILE);

    final Schema schema =
        SchemaBuilder.record("simpleSchema")
            .fields()
            .name("Key")
            .type()
            .intType()
            .noDefault()
            .name("FieldA")
            .type()
            .doubleType()
            .noDefault()
            .endRecord();
    final Record record1 = new Record(schema);
    record1.put(0, 1);
    record1.put(1, 2.0);
    final Record record2 = new Record(schema);
    record2.put(0, 2);
    record2.put(1, 5.0);
    final Collection<Record> recordsToWrite = List.of(record1, record2);
    writeParquetFile(
        this.parquetFilePath,
        schema,
        recordsToWrite,
        new Configuration(),
        CompressionCodecName.UNCOMPRESSED);
  }

  @Test
  void testUpgradeParquet() throws Exception {

    final org.apache.hadoop.fs.Path filepath = new org.apache.hadoop.fs.Path(this.parquetFilePath.toString());
    final Configuration configuration = new Configuration();

    @Cleanup
    final ParquetReader<CustomParquetRecord> parquetReader = ParquetReader
        .builder(new CustomReadSupport(), filepath)
        .withConf(configuration)
        .build();

    final var pivotRecord = parquetReader.read(); // Failure here
    assertThat(pivotRecord).isNotNull();

  }

  private interface CustomParquetRecord extends GenericRecord, SchemaConstructable {
  }

  private static class CustomReadSupport extends AvroReadSupport<CustomParquetRecord> {
    @Override
    public RecordMaterializer<CustomParquetRecord> prepareForRead(
        Configuration configuration,
        Map<String, String> keyValueMetaData,
        MessageType fileSchema,
        ReadContext readContext) {
      final RecordMaterializer<CustomParquetRecord> rMaterializer =
          super.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext);

      return new RecordMaterializer<>() {

        @Override
        public CustomParquetRecord getCurrentRecord() {
          return rMaterializer.getCurrentRecord();
        }

        @Override
        public GroupConverter getRootConverter() {
          // Difference here with actual code,
          // we return a custom root converter
          // It is not included in this reproducer as it would bring way too much code
          return null;
        }
      };
    }
  }

  /**
   * Creates a parquet file with contents.
   *
   * @param filePath filePath where the file is created
   * @param schema schema of the data
   * @param recordsToWrite records to write
   * @param configuration the hadoop configuration to use for the writer
   * @param compressionCodec the codec for the compression scheme to use for the parquet file
   */
  private static void writeParquetFile(
      final Path filePath,
      final Schema schema,
      final Collection<Record> recordsToWrite,
      final Configuration configuration,
      final CompressionCodecName compressionCodec)
      throws IOException, InterruptedException {
    try {
      Files.deleteIfExists(filePath);
    } catch (final NoSuchFileException x) {
      throw new RuntimeException(
          String.format("%s: no such file or directory.", filePath), x);
    } catch (final DirectoryNotEmptyException x) {
      throw new RuntimeException(String.format("%s not empty.", filePath), x);
    } catch (final IOException x) {
      throw new RuntimeException(x);
    }

    final org.apache.hadoop.fs.Path path =
        new org.apache.hadoop.fs.Path(filePath.toAbsolutePath().toString());

    RecordWriter<Void, Record> writer = null;
    try {
      writer =
          new ParquetOutputFormat<Record>(
              new AvroWriteSupport<>(
                  new AvroSchemaConverter(configuration).convert(schema),
                  schema,
                  SpecificData.get()))
              .getRecordWriter(configuration, path, compressionCodec, Mode.CREATE);
      for (final Record record : recordsToWrite) {
        writer.write(null, record);
      }
    } finally {
      if (writer != null) {
        writer.close(null);
      }
    }
  }

}

N-B: This code will throw when using 1.13.1 but it is expected. I didn't wanted to complexify the reproducer with other code. You will get this error:

org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/tmp/directoryForTest/source.parquet

Component(s)

No response