apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.65k stars 1.41k forks source link

Schema mismatch for reading Avro from parquet file with old schema version? #1628

Open asfimport opened 3 years ago

asfimport commented 3 years ago

I ran into what looks like a bug in the Parquet Avro reading code, around trying to read a file written with a previous version of a schema with a new, evolved version of the schema.

I'm using Apache Beam's ParquetIO library, which supports passing in schemas to use for "projection" and I was investigating if that would work for me here. However, it didn't work, complaining that my new reader schema had a field that wasn't in the writer schema.

 

I traced this through to a couple places in the parquet-avro code that don't look right to me:

 

First, in prepareForRead here: https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java#L116

The parquetSchema var comes from parquetSchema = readContext.getRequestedSchema(); while the avroSchema var comes from the parquet file itself with avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));

I can verify that parquetSchema is the schema I'm requesting it be projected to and that avroSchema is the schema from the file, but the naming looks backward, shouldn't parquetSchema be the one from the parquet file?

Following the stack down, I was hitting this line: https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java#L91

here it was failing because the avroSchema didn't have a field that was in the parquetSchema, with the variables assigned in the same way as above. That's the case I was hoping to use this projection for, though - to get the record read with the new reader schema, using the default value from the new schema for the new field. In fact, the comment on line 101 "store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema)" suggests that the intent was for this to work, but the actual code has the writer schema in avroSchema and the reader schema in parquetSchema.

(Additionally, I'd want this to support schema evolution both for adding an optional field and also removing an old field - so just flipping the names around would result in this still breaking if the reader schema dropped a field from the writer schema...)

Looking to understand if I'm interpreting this correctly, or if there's another path that's intended to be used.

Thank you!

Environment: Linux, Apache Beam 2.28.0, Java 11 Reporter: Philip Wilcox

Note: This issue was originally created as PARQUET-2055. Please see the migration documentation for further details.

asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: [~philipwilcox], I think the main misunderstanding comes from the fact that Avro supports schema evolution while Parquet does not. What happens in the parquet-avro binding is basically the conversion of the schema (if required because avro schema is not set nor saved in the file) and the conversion of the values. You may set the projection avro schema by setting the hadoop conf parquet.avro.projection but it cannot support Avro schema evolution since Parquet does not have such things (like default values). So currently you are not able to read columns that are not in the file (by using default values or nulls). The file schema has to contain the projection schema.

asfimport commented 3 years ago

Philip Wilcox: Hi @gszadovszky , I'm not sure I follow regarding how Parquet does or doesn't support schema evolution more than Avro. For my data warehouse purposes, Parquet works fine with the schema evolution I'm trying to do (adding or removing fields), since my metastore/query engine will happily bring back NULL for those new fields for values from old files, and either bring back NULL for new files with dropped columns or simply ignore those columns if I don't have them specified in the metastore. I suppose that's more the query engine than Parquet itself supporting schema evolution, but the Parquet files aren't my problem for that use case.

 

I don't know why the default values would be expected to come from the Parquet file schema. The Parquet file schema is the previous version of the schema. The default value for a new field that isn't in the file wouldn't be expected to come from the Parquet file itself, it would be expected to come from the new Avro schema, that I'm using in my reader code.

 

I want to apply the rules here: http://avro.apache.org/docs/current/spec.html#Schema+Resolution  - namely, "if the reader's record schema has a field that contains a default value, and writer's schema does not have a field with the same name, then the reader should use the default value from its field" and "if the writer's record contains a field with a name not present in the reader's record, the writer's value for that field is ignored." The schema that needs the default value is the reader schema that I'm providing, not the writer schema that was used to create the file.

 

My use case is: I also read these records back in to an application for further processing, I want to bring them back by going through Avro records. And that's where the Avro support for evolution when reading seems missing. If I have multiple files with multiple different-but-compatible schemas, I can coerce all the records I read to be GenericRecords of the same (latest) schema, so that the code that operates on the GenericRecords then can have a unified set of fields to work with. This works fine, but it feels like it should be library code, since the code I implemented is very similar to what's already in the avro record converters. And if it was library code, I could eventually do this directly in a single read, instead of having to step through two versions of generic records.

asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: [~philipwilcox], sorry if I was misleading. I wanted to say that parquet-mr only supports projection so you cannot use other "schema evolution capabilities". However, I do not have too much experience in parquet-avro and found a discussion in another jira: PARQUET-465. It's quite old but you may check if Ryan's answers helps in your case.

asfimport commented 3 years ago

Philip Wilcox: @gszadovszky that makes sense, thank you for that link! I think I definitely understand the situation better now than when I first opened this ticket - I think really what I was going for was a feature request, "Support for following avro schema evolution rules when reading Parquet back into newer Avro schemas" which is very similar to that PARQUET-465.

 

The context is very useful! Yeah, renaming is tough if you allow addition and removal. I don't think I fully understand the difference between the projection capabilities of parquet-mr and Avro schema evolution rules? Because in terms of the conversion to Avro, this seems like a conflict with standard behavior? Renaming fields in Avro schema evolution is not generally supported (outside of aliases), for instance, the Schema Resolution section of that Avro spec doc specifies that "the ordering of fields may be different: fields are matched by name" when reading data into a different schema.

 

However, this is probably, by this point, behavior that others depend on from the current Parquet reading projection.

 

Is "read with support for Avro schema evolution" something that would make sense as a new feature? The projection code seems tantalizingly close to what would be needed for this as well, though I'm not familiar enough with the library to know what else would need changing or what else would potentially break.

asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: [~philipwilcox], There is no schema evolution for Parquet neither for the spec nor for the java implementation (parquet-mr). The support for projection is not really a feature of schema evolution but a practical feature of the column oriented formats that they can skip columns without any effort (unlike row oriented formats). Since the bindings like parquet-avro are about "simply" converting the schemas and the data if required I am not sure how easy it would be to support Avro schema evolution in Parquet. Since PARQUET-465 is related and not solved for more than 5 years I would be skeptical to leave such a feature request to the community. So, if you can invest on implementing it, it would be very welcomed.