apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.38k stars 2.2k forks source link

Iceberg is not working when writing AVRO from spark #2174

Closed kkishoreyadav closed 3 years ago

kkishoreyadav commented 3 years ago

1) We are encountering the following error when appending AVRO files from GCS to table. The avro files are valid but we use deflated avro, is that a concern?

Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: org/apache/avro/InvalidAvroMagicException at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77) at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) at org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320) at org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237) at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46) at org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) at org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) at org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:343) at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163) at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) at org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2$adapted(JavaDStreamLike.scala:280) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 more

2) The log shows that table already exists for the iceberg table, however I am unable to see the metadata file in gcs? I am running the spark job from the dataproc cluster, where can i see the metadata file?

##################### Iceberg version: 0.11 spark version 3.0 #####################

public void appendData(List<FileMetadata> publishedFiles, Schema icebergSchema) {
    TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, jobConfig.streamName());
    // PartitionSpec partitionSpec = IcebergInternalFields.getPartitionSpec(tableSchema);
    HadoopTables tables = new HadoopTables(new Configuration());

    PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
            .build();

    Table table = null;
    if (tables.exists(tableIdentifier.name())) {
        table = tables.load(tableIdentifier.name());
    } else {
        table = tables.create(
                icebergSchema,
                partitionSpec,
                tableIdentifier.name());
    }
    AppendFiles appendFiles = table.newAppend();
    for (FileMetadata fileMetadata : publishedFiles) {

        appendFiles.appendFile(DataFiles.builder(partitionSpec)
                .withPath(fileMetadata.getFilename())
                .withFileSizeInBytes(fileMetadata.getFileSize())
                .withRecordCount(fileMetadata.getRowCount())
                .withFormat(FileFormat.AVRO)
                .build());
    }
    appendFiles.commit();
}
kkishoreyadav commented 3 years ago

The following two things have fixed my problem