Closed zeodtr closed 1 month ago
@zeodtr Thanks for raising this issue. Looks like we need some proper Spark/Rust integration tests :)
@martin-g Do you have any ETA on Avro Rust 0.17?
@martin-g Do you have any ETA on Avro Rust 0.17?
The Rust SDK is released with all other SDKs, i.e. when 1.12.0/1.11.4 is released.
@martin-g Thanks for clearing that up. Since the versioning is different, I assumed that there was also a different release cadence. Let me follow up with @jbonofre on how we can push out the next release 👍
There were some discussions about doing separate SDK releases - https://lists.apache.org/thread/2rfnszd4dk36jxynpj382b1717gbyv1y but nothing happened mainly due to the lack of interest in the PMC members...
I'm afraid it would be hard for me to get two more PMC members' +1s for a release of the Rust SDK.
Strangely, I was working with the Rust API on tables generated by Spark with no such issue, but when I tried to port to Rust some code that deals with tables generated by Trino, then I got this exception as well.
Error: DataInvalid => Failure in conversion with avro
Source: Missing field in record: "added_data_files_count"
Doesn't seem Spark specific issue Is this indeed in progress for the 0.3? is there some task associated with this @zeodtr @Fokko
Can you share the metadata JSON? I don't think the field ID resolution is being applied, described in issue https://github.com/apache/iceberg-rust/issues/353. added_data_files_count
is the old name since in V2 it also included delete files. The name should not matter and should be corrected as in https://github.com/apache/iceberg-rust/issues/354.
@a-agmon It's possible that this is not a Spark-specific issue, since it is related to the Java Iceberg library.
@Fokko if #354 is applied, iceberg-rust will no longer be able to read the manifest list files created by pre-1.5.0 Spark and pre-#354 iceberg-rust, since iceberg-rust does not read the fields by field-id
(it reads the fields by field names). (Please correct me if I'm wrong. I could not find the code in iceberg-rust that resolves the fields by field-id
)
Can you share the metadata JSON? I don't think the field ID resolution is being applied, described in issue #353.
added_data_files_count
is the old name since in V2 it also included delete files. The name should not matter and should be corrected as in #354.
Thanks @Fokko and @zeodtr
If I understand the issue correctly, as @zeodtr explains, it seems that the manifest list generated by Spark 3.3 + Iceberg 1.3 (which I use) is using the field name added_data_files_count
and so is the Iceberg-Rust lib. However, the manifest list generated by Trino correctly uses the V2 field name added_files_count
. So in fact, unless this is changed to use the field-id
instead of the name, then there is no way this can be fixed without breaking backward compatibility.
Here is the metadata.json representing the "new" schema (I had to anonymize a few things).
It was generated by Trino (via its DBT connector). The referenced ManifestList follows below. As you can see indeed it is using the new schema field name added_files_count
and that's why I get the error above.
{
"format-version" : 2,
"table-uuid" : "aa3b9ef5-c067-4a08-8e9e-8a061d6c64e1",
"location" : "s3://*********/dbt/***.db/*********-e562cf0876494c9f9d61e4f044f16ce8",
"last-sequence-number" : 1,
"last-updated-ms" : 1714657784414,
"last-column-id" : 12,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "row_id",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "dt",
"required" : false,
"type" : "date"
}, {
"id" : 3,
"name" : "sub_acc_id",
"required" : false,
"type" : "string"
}, {
"id" : 4,
"name" : "master_acc_id",
"required" : false,
"type" : "string"
}
........... shortened
, {
"id" : 12,
"name" : "total_quantity",
"required" : false,
"type" : "long"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "dt_month",
"transform" : "month",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"write.format.default" : "PARQUET",
"write.parquet.compression-codec" : "zstd"
},
"current-snapshot-id" : 6685531058123427778,
"refs" : {
"main" : {
"snapshot-id" : 6685531058123427778,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 6685531058123427778,
"timestamp-ms" : 1714657783993,
"summary" : {
"operation" : "append",
"trino_query_id" : "20240502_134523_18961_n5bhn",
"added-data-files" : "503",
"added-records" : "176720085",
"added-files-size" : "3221301527",
"changed-partition-count" : "32",
"total-records" : "176720085",
"total-files-size" : "3221301527",
"total-data-files" : "503",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://*********/dbt/***.db/*********-e562cf0876494c9f9d61e4f044f16ce8/metadata/snap-6685531058123427778-1-2584bfc0-abb3-4257-a61c-0498a8e29dc4.avro",
"schema-id" : 0
} ],
"statistics" : [ {
"snapshot-id" : 6685531058123427778,
"statistics-path" : "s3://*********/dbt/***.db/*********-e562cf0876494c9f9d61e4f044f16ce8/metadata/20240502_134523_18961_n5bhn-748b8bf4-ef54-4882-8d7f-2f6b9edbe85b.stats",
"file-size-in-bytes" : 294179,
"file-footer-size-in-bytes" : 2418,
"blob-metadata" : [ {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 1 ],
"properties" : {
"ndv" : "172900302"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 2 ],
"properties" : {
"ndv" : "944"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 3 ],
"properties" : {
"ndv" : "47786"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 4 ],
"properties" : {
"ndv" : "47513"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 5 ],
"properties" : {
"ndv" : "195336"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 6 ],
"properties" : {
"ndv" : "171901"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 7 ],
"properties" : {
"ndv" : "7237440"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 8 ],
"properties" : {
"ndv" : "4"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 9 ],
"properties" : {
"ndv" : "42995"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 10 ],
"properties" : {
"ndv" : "47504"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 11 ],
"properties" : {
"ndv" : "10"
}
}, {
"type" : "apache-datasketches-theta-v1",
"snapshot-id" : 6685531058123427778,
"sequence-number" : 1,
"fields" : [ 12 ],
"properties" : {
"ndv" : "583133"
}
} ]
} ],
"partition-statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1714657783993,
"snapshot-id" : 6685531058123427778
} ],
"metadata-log" : [ {
"timestamp-ms" : 1714657783993,
"metadata-file" : "s3://*********/dbt/***.db/*********-e562cf0876494c9f9d61e4f044f16ce8/metadata/00000-52cc6f6a-259c-4ca8-ba89-cd3a99b9eedb.metadata.json"
} ]
}
The referenced manifest
{
"manifest_path":"s3://*********/dbt/***.db/*********-e562cf0876494c9f9d61e4f044f16ce8/metadata/2584bfc0-abb3-4257-a61c-0498a8e29dc4-m0.avro",
"manifest_length":98246,
"partition_spec_id":0,
"content":0,
"sequence_number":1,
"min_sequence_number":1,
"added_snapshot_id":6685531058123427778,
"added_files_count":503,
"existing_files_count":0,
"deleted_files_count":0,
"added_rows_count":176720085,
"existing_rows_count":0,
"deleted_rows_count":0,
"partitions":{
"array":[
{
"contains_null":false,
"contains_nan":{
"boolean":false
},
"lower_bound":{
"bytes":"m\u0002\u0000\u0000"
},
"upper_bound":{
"bytes":"\u0002\u0000\u0000"
}
}
]
}
}
Another way to resolve this, in a less workaround-ish way, is to simply capture the fact that we have a V1 schema, a V2 schema, and a V2 compatibility schema, which is identical to V2 with just the names of these 3 fields according to V1.
Then, in the parse_with_version
function, we can simply try first with V2 schema and then fall back to V2Compat if it fails.
Something like this perhaps:
pub fn parse_with_version(
bs: &[u8],
version: FormatVersion,
partition_type_provider: impl Fn(i32) -> Result<Option<StructType>>,
) -> Result<ManifestList> {
match version {
FormatVersion::V1 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider)
}
FormatVersion::V2 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
let read_result = reader.collect::<std::result::Result<Vec<Value>, _>>();
match read_result {
Ok(records) => {
let values = Value::Array(records);
from_value::<_serde::ManifestListV2>(&values)?
.try_into(&partition_type_provider)
}
Err(e) => {
println!("Error reading values according to V2 schema, trying to fall back to V2_COMAPT: {:?}", e);
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2_COMPAT, bs)?;
let records = reader.collect::<std::result::Result<Vec<Value>, _>>()?;
let values = Value::Array(records);
from_value::<_serde::ManifestListV2Compat>(&values)?
.try_into(&partition_type_provider)
}
}
}
}
}
Check out this branch in which I implement this for my use case (using a ManifestFileV2Compat
and MANIFEST_LIST_AVRO_SCHEMA_V2_COMPAT
which captures this)
WDYT @Fokko @zeodtr ?
As @zeodtr says, breaking compact with Spark-Iceberg < 1.5.0 is going to be a major issue unless we either support a compat schema or move to work with field-id
@Fokko if https://github.com/apache/iceberg-rust/issues/354 is applied, iceberg-rust will no longer be able to read the manifest list files created by pre-1.5.0 Spark and pre-https://github.com/apache/iceberg-rust/issues/354 iceberg-rust, since iceberg-rust does not read the fields by field-id (it reads the fields by field names). (Please correct me if I'm wrong. I could not find the code in iceberg-rust that resolves the fields by field-id)
If this is the case, we should update the rust implementation to do the lookup by field ID.
So in fact, unless this is changed to use the field-id instead of the name, then there is no way this can be fixed without breaking backward compatibility.
This is exactly the reason why we should never look up fields by name, and use the ID instead. This will avoid breaking changes.
The referenced manifest
You need to look at the schema, you can do this for example using Avro tools: avro tools schema manifest.avro
.
Something like this perhaps
It is not based on the table format version. The change was because of the introduction in the V2 spec, but has been applied in Java later on: https://github.com/apache/iceberg/pull/5338 It was just merged after the 1.4.0 release, so it is part of the 1.5.0 release. Trino uses the Java library so the change will also be fixed upstream.
The only way forward is doing the lookups by field-id.
@a-agmon Since the problem is in the schema, IMO checking the schema itself before reading the record is more appropriate. And since the error could be the other one, it cannot be assumed to be a schema mismatch error.
And since avro's schema resolution mechanism is practically useless in the Iceberg, using Reader::new()
would be helpful for the performance.
BTW, the reading process itself is pretty slow (IMO). In my case, reading and processing an 8MB avro file took about 1.2sec on my server-class machine. I suspect that apache_avro crate has a problem that does string copy too many times. In the above case, it did string copy about 20 million times. I hope that this problem be resolved when reading-by-field-id
is implemented.
@Fokko Reading the fields by field-id
is the way to go, but IMO it would be a big task that will replace a major part of apache_avro. Since it can take time, applying my (or a-agmon's) workaround could be a practical (temporary) solution.
Thanks, @Fokko and @zeodtr, for the clarifications and explanations!
I think that it's important to fix this in the next release, as the current situation is that the Rust API currently does not support Iceberg tables generated using the > 1.5.0 Iceberg Java library, which seems to be quite a limitation.
We agree that we must use field-id
for field resolution, but also that this might require a lot of work because the Avro Rust library at the moment uses field-name
for serde.
I have tried to implement a resolution that incorporates both requirements: it uses field-id
when reading the Avro manifest files, but also does not make significant changes in the code flow and the usage of the Avro lib.
It's not the most elegant solution, but seems to resolve the issue along these lines, at least as I tested. The solution consists of 3 stages:
parse_with_version()
function FormatVersion::V2 => {
// 1. get a hashmap that maps the field_id to the field_name in the Manifest's schema
let manifest_file_schema =
Self::get_record_schema(MANIFEST_LIST_AVRO_SCHEMA_V2.clone())?;
let manifest_file_schema_fields: HashMap<String, String> = =
Self::get_manifest_schema_fields_map(manifest_file_schema, true)?;
// 2. get a hashmap that maps field_name to field_id in the schema of the read avro file
let reader = Reader::new(bs)?;
let file_schema = Self::get_record_schema(reader.writer_schema().clone())?;
let file_schema_fields: HashMap<String, String> =
Self::get_manifest_schema_fields_map(file_schema, false)?;
// 3. get a vec of records from the read avro file .
// each record is a hashmap of field_id and field_value
let file_records = reader.collect::<std::result::Result<Vec<Value>, _>>()?;
let file_records_values_map: HashMap<String, Value> = =
Self::get_avro_records_as_map(file_records, file_schema_fields)?;
// 4. for each record (manifest file) in the Avro file records maps,
// traverse the schema of the manifest file: for each field id in the schema, get the field value from the record
let manifest_records: Vec<Value> = file_records_values_map
.into_iter()
.map(|file_record_fields| {
let fields_values: Vec<_> = manifest_file_schema_fields
.iter()
.filter_map(|(schem_field_id, schem_field_name)| {
file_record_fields
.get(schem_field_id)
.map(|value| (schem_field_name.clone(), value.clone()))
})
.collect();
Value::Record(fields_values)
})
.collect();
let values = Value::Array(manifest_records);
let manifest = from_value::<_serde::ManifestListV2>(&values)?;
manifest.try_into(partition_type_provider)
}
Please let me know what you think, (Its just a suggestion, ofc, its also possible to re-write this without the Avro lib)
I'm also posting this in Slack for visibility as I think its sufficiently important.
I think creating a field-id to a field-name map is a good (interim) solution.
Keep in mind that the next Avro release is planned for this week: https://lists.apache.org/thread/6pn8jztkyom8tr5vbxr1pqgwx6bj0h4c
According to https://github.com/apache/iceberg-rust/issues/131 looking up the field-id should be resolved in the next release.
@a-agmon My concerns are as follows:
manifest_file_schema_fields
hashmap should be calculated only once in an application's lifetime (for performance).field-id
can be slow (maybe not).file_records
will be pretty slow, since apache_avro's read function is slow. It should be addressed (maybe later) by overriding apache_avro's read function. I think the read function can be made faster by not building the interim Value
struct which does a lot of string cloning for field names (I guess).@Fokko #131 only solves the 'saving' part. Currently iceberg-rust does not save field-id
to the avro file due to the limitation of apache_avro. #131 solves it.
Using the field-id
when reading is an entirely different problem. In fact, it's not apache_avro's problem. apache_avro does not care about the field-id
field (and should not since it's not in the avro spec). It's just another custom attribute saved by a user. iceberg-rust must handle it.
Thanks @zeodtr , We can certainly cache the manifest schema, and also recurse on the fields read from the file. Implementing a more efficient reader is also possible but indeed seems like quite a task. Given that the next Avro release will only resolve part of the issue, I think that the bottom line question is whether and how to fix this now. There were a few suggestions here, I would be happy to assist, but I think we need some guidance.
Looking forward to your comments, @Fokko @liurenjie1024 @Xuanwo
Added a PR that proposes an interim, but more elegant, solution to the problem. I think. WDYT @Fokko @zeodtr @liurenjie1024
here is the main modification
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct ManifestFileV2 {
pub manifest_path: String,
pub manifest_length: i64,
pub partition_spec_id: i32,
pub content: i32,
pub sequence_number: i64,
pub min_sequence_number: i64,
pub added_snapshot_id: i64,
#[serde(alias = "added_data_files_count", alias = "added_files_count")]
pub added_data_files_count: i32,
#[serde(alias = "existing_data_files_count", alias = "existing_files_count")]
pub existing_data_files_count: i32,
#[serde(alias = "deleted_data_files_count", alias = "deleted_files_count")]
pub deleted_data_files_count: i32,
pub added_rows_count: i64,
pub existing_rows_count: i64,
pub deleted_rows_count: i64,
pub partitions: Option<Vec<FieldSummary>>,
pub key_metadata: Option<ByteBuf>,
}
Hi, I've been developing a query engine that uses
iceberg-rust
crate. Upon checking Iceberg compatibility with org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.4.3, I didn't encounter any issues, at least not with my engine. However, when testing with org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.5.0, I did come across a few issues. I managed to address them, either through fixes or workarounds. Here's a summary of the issues encountered and the solutions applied:Issue 1.
In the following scenario,
The reason behind this is that iceberg-rust doesn't include
"logicalType": "map"
in the Avro schema for Iceberg maps with non-string keys, which are represented as Avro arrays.To address this, I've applied the not-yet-official
apache_avro
0.17 from GitHub and adjusted the iceberg-rust code to align with the changed Avro Rust API. (BTW, the API change was done by an iceberg-rust developer maybe to fix this kind of issue). Then add the logical type to the schema.Issue 2.
In the following scenario,
Once I applied version apache_avro 0.17 and started writing
field-id
to the Avro schema, this issue was resolved.Issue 3.
In the following scenario,
This error is related to the Iceberg issue https://github.com/apache/iceberg/issues/8684 and
iceberg-rust
's inability to read an Avro data usingfield-id
, instead relying on field names.In the aforementioned Iceberg issue, an Iceberg Java developer discovered inconsistencies between the Java source code and specifications regarding the field names of the
manifest_file
struct. Subsequently, the source code was modified to align with the specifications. As a result, Iceberg Java's Avro writers started using different (correct) field names. This adjustment didn't affect Iceberg Java, as it reads the Avro data usingfield-id
rather than the field name. However, iceberg-rust reads the Avro schema using the field name, causing the current issue.To address this, I examined the iceberg-rust and Avro rust codes. However, implementing the functionality to read the Avro data using
field-id
seemed to require a fair amount of time (at least for me). As a temporary solution, I applied an ad hoc workaround inmanifest_list.rs
, after replacing all the incorrect field names in the code.It essentially replaces 'wrong' field names with the correct ones. However, I perceive this as more of a workaround than a solution. Nonetheless, it serves its purpose for the time being. It would be nice if a more fundamental solution could be implemented in the future, such as reading the Avro data using
field-id
.Thank you.