aloneguid / parquet-dotnet

Fully managed Apache Parquet implementation
https://aloneguid.github.io/parquet-dotnet/
MIT License
542 stars 141 forks source link

[BUG]: generated parquet cannot be read by Athena #526

Open greg0rym opened 5 days ago

greg0rym commented 5 days ago

Library Version

4.24.0

OS

Windows, Ubuntu Linux

OS Architecture

64 bit

How to reproduce?

Parquet generated with complex structure is not readable by AWS Athena.

Create file with this code:

var schema = new ParquetSchema(
                new DataField<int>("id"),
                new ListField("animals",
                    new StructField("animal",
                        new DataField<string>("animal_name"),
                        new StructField("characteristics",
                            new DataField<string>("overview"),
                            new ListField("likes",
                                new StructField("like",
                                    new DataField<string>("like_name"),
                                    new StructField("favourite",
                                        new DataField<string>("favourite_name")
                                    )
                                )
                            )
                        )
                    )
                )
            );

            var table = new Table(schema)
            {
                {
                    1,
                    new[]
                    {
                        new Row
                        (
                            "Dog",
                            new Row
                            (
                                "4 legs, energetic",
                                new []
                                {
                                    new Row("food",new Row("meat")),
                                    new Row("play",new Row("fetch")) // delete this line and the parquet is readable in Athena
                                }
                            )
                        )
                    }
                }
            };

            File.Delete("file.parquet");
            await table.WriteAsync("file.parquet");

Create Athena table and upload generated parquet to S3 location specified in CREATE TABLE statement

CREATE EXTERNAL TABLE `greg_test`
(
    `id` int,
    `animals` array
    < 
        struct
        < 
            animal_name: string,
            characteristics: struct
            <
                overview: string,
                likes: array
                <
                    struct
                    < 
            like_name: string,
                        favourite: struct<favourite_name: string>
                    >
                >
            >
        >
    >
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://LOCATION_HERE'
TBLPROPERTIES ('parquet.compression' = 'SNAPPY')

Run SELECT * FROM greg_test gives HIVE_CURSOR_ERROR: Failed to read Parquet file.

If I remove the play row from the nested list the data is readable in Athena.

If I read the definition and repetition levels I can see that the repetition level is greater than the MaxRepitionLevel for fields like_name and favourite_name

// read back
            using var memoryStream = new MemoryStream();
            await table.WriteAsync(memoryStream);
            using ParquetReader reader = await ParquetReader.CreateAsync(memoryStream);
            using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(0);

            var dataFields = reader.Schema.GetDataFields();

            foreach (var dataField in dataFields)
            {
                Console.WriteLine($"Reading field {dataField.Name}...");
                Console.WriteLine($"MaxDefinitionLevel {dataField.MaxDefinitionLevel}");
                Console.WriteLine($"MaxRepetitionLevel {dataField.MaxRepetitionLevel}");

                var column = await rowGroupReader.ReadColumnAsync(dataField);

                Console.WriteLine("Column definition levels:");
                column.DefinitionLevels?.ToList().ForEach(x => Console.WriteLine(x));

                Console.WriteLine("Column repetition levels:");
                column.RepetitionLevels?.ToList().ForEach(x => Console.WriteLine(x));

                Console.WriteLine();
            }
Reading field like_name...
MaxDefinitionLevel 8
MaxRepetitionLevel 2
Column definition levels:
8
8
Column repetition levels:
0
3

Reading field favourite_name...
MaxDefinitionLevel 9
MaxRepetitionLevel 2
Column definition levels:
9
9
Column repetition levels:
0
3

Failing test

using Parquet;
using Parquet.Rows;
using Parquet.Schema;

namespace ParquetPackageWithListOfStructs
{
    public class ParquetBuilder
    {
        public async Task Build()
        {
            var schema = new ParquetSchema(
                new DataField<int>("id"),
                new ListField("animals",
                    new StructField("animal",
                        new DataField<string>("animal_name"),
                        new StructField("characteristics",
                            new DataField<string>("overview"),
                            new ListField("likes",
                                new StructField("like",
                                    new DataField<string>("like_name"),
                                    new StructField("favourite",
                                        new DataField<string>("favourite_name")
                                    )
                                )
                            )
                        )
                    )
                )
            );

            var table = new Table(schema)
            {
                {
                    1,
                    new[]
                    {
                        new Row
                        (
                            "Dog",
                            new Row
                            (
                                "4 legs, energetic",
                                new []
                                {
                                    new Row("food",new Row("meat")),
                                    new Row("play",new Row("fetch")) // delete this line and the parquet is readable in Athena
                                }
                            )
                        )
                    }
                }
            };

            File.Delete("file.parquet");
            await table.WriteAsync("file.parquet");

            // read back
            using var memoryStream = new MemoryStream();
            await table.WriteAsync(memoryStream);
            using ParquetReader reader = await ParquetReader.CreateAsync(memoryStream);
            using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(0);

            var dataFields = reader.Schema.GetDataFields();

            foreach (var dataField in dataFields)
            {
                Console.WriteLine($"Reading field {dataField.Name}...");
                Console.WriteLine($"MaxDefinitionLevel {dataField.MaxDefinitionLevel}");
                Console.WriteLine($"MaxRepetitionLevel {dataField.MaxRepetitionLevel}");

                var column = await rowGroupReader.ReadColumnAsync(dataField);

                Console.WriteLine("Column definition levels:");
                column.DefinitionLevels?.ToList().ForEach(x => Console.WriteLine(x));

                Console.WriteLine("Column repetition levels:");
                column.RepetitionLevels?.ToList().ForEach(x => Console.WriteLine(x));

                Console.WriteLine();
            }
        }
    }
}
greg0rym commented 4 days ago

I am going to test this with the low level api.....

greg0rym commented 2 days ago

The low level api and class serialization api appear to handle the same scenario correctly.