apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
673 stars 159 forks source link

Bug in `PartialEq` for `Struct` #706

Open Sl1mb0 opened 2 days ago

Sl1mb0 commented 2 days ago

Problem

If I write a Manifest to an output.avro file and then read that same output.avro file into another Manifest object, asserting that the two objects are equal fails due to inequality between the partition fields of each Manifest:

Example

// Write manifest
let manifest_list_entry = manifest_writer.write(input_manifest.clone()).await.unwrap();

// Assert that the manifest path exists.
let manifest_path = PathBuf::from(manifest_list_entry.manifest_path);
assert!(manifest_path.exists());

// Assert that manifest file can be parsed back into a `Manifest` struct.
let buf = std::fs::read(&manifest_path).unwrap();
let output_manifest = Manifest::parse_avro(&buf).unwrap();

// ..
// Assert that all other fields are equal
// ..

// Fails
assert_eq!(file1.partition(), file2.partition());

Output

assertion `left == right` failed
  left: Struct { fields: [Primitive(Boolean(true))], null_bitmap: BitVec<usize, bitvec::order::Lsb0> { addr: 0x78a7fc007210, head: 000000, bits: 1, capacity: 64 } [0] }
 right: Struct { fields: [], null_bitmap: BitVec<usize, bitvec::order::Lsb0> { addr: 0x8, head: 000000, bits: 0, capacity: 0 } [] }
alamb commented 1 day ago

In case it helps, here is the workaround we used:

(there is something about the partition on DataFile that doesn't compare as equal

    /// Validates that two `Manifest`s are equal.
    ///
    /// The partial_eq implementation for `Manifest` seems to compare some fields structurally
    /// e
    fn assert_manifest_eq(m1: Manifest, m2: Manifest) {
        let (entries1, metadata1) = m1.into_parts();
        let (entries2, metadata2) = m2.into_parts();
        assert_eq!(entries1.len(), entries2.len());
        for (e1, e2) in entries1.into_iter().zip(entries2.into_iter()) {
            assert_manifest_entry_eq(&e1, &e2);
        }
        assert_eq!(metadata1, metadata2);
    }

    /// Validates that two `ManifestEntry`s are equal.
    ///
    /// Can't use the partial_eq implementation for `ManifestEntry` because it
    /// seems to have a bug in the equality check for partition.
    fn assert_manifest_entry_eq(e1: &ManifestEntry, e2: &ManifestEntry) {
        assert_eq!(e1.content_type(), e2.content_type());
        let file1 = e1.data_file();
        let file2 = e2.data_file();
        assert_eq!(file1.content_type(), file2.content_type());
        assert_eq!(file1.file_path(), file2.file_path());
        assert_eq!(file1.file_format(), file2.file_format());
        // seems to have a bug in the equality check for partition;
        // assert_eq!(file1.partition(), file2.partition());
        assert_eq!(file1.record_count(), file2.record_count());
        assert_eq!(file1.file_size_in_bytes(), file2.file_size_in_bytes());
        assert_eq!(file1.column_sizes(), file2.column_sizes());
        assert_eq!(file1.value_counts(), file2.value_counts());
        assert_eq!(file1.null_value_counts(), file2.null_value_counts());
        assert_eq!(file1.nan_value_counts(), file2.nan_value_counts());
        assert_eq!(file1.lower_bounds(), file2.lower_bounds());
        assert_eq!(file1.upper_bounds(), file2.upper_bounds());
        assert_eq!(file1.key_metadata(), file2.key_metadata());
        assert_eq!(file1.split_offsets(), file2.split_offsets());
        assert_eq!(file1.equality_ids(), file2.equality_ids());
        assert_eq!(file1.sort_order_id(), file2.sort_order_id());
    }
liurenjie1024 commented 1 day ago

Thanks for reporting this, could you provide a more concrete example to reproduce this?

Sl1mb0 commented 1 day ago

Sure!

mod tests {
    use std::collections::HashMap;
    use std::io::Seek;
    use std::sync::Arc;

    use arrow::{
        array::Float16BufferBuilder, buffer, compute::kernels::partition, ipc::FieldBuilder,
    };
    use flatbuffers::FlatBufferBuilder;
    use iceberg::spec::{
        DataFile, DataFileBuilder, Datum, FormatVersion, Literal, ManifestStatus, NestedField,
        PartitionField, PartitionSpec, PrimitiveLiteral, PrimitiveType, Struct, Transform, Type,
    };
    use iceberg::transform;

    use super::*;

    #[tokio::test]
    async fn test_manifest_builder() {
        // Create a schema with a single nested field.
        let schema_fields = (0..4).map(|i| {
            Arc::new(NestedField {
                id: i,
                name: format!("field_{}", i),
                required: true,
                field_type: Box::new(Type::Primitive(PrimitiveType::Boolean)),
                doc: None,
                initial_default: None,
                write_default: None,
            })
        });

        let schema = IcebergSchema::builder()
            .with_fields(schema_fields)
            .with_schema_id(0)
            .build()
            .unwrap();

        // Get the schema id, and create an arbitrary snapshot id.
        let schema_id = schema.schema_id();
        let snapshot_id = 0

        // Create a partition spec with a single partition field.
        let partition_spec = PartitionSpec::builder(&schema)
            .with_spec_id(0)
            .build()
            .unwrap();

        // Create the manifest metadata with `schema` and `partition_spec`.
        let metadata = ManifestMetadata::builder()
            .schema(schema)
            .schema_id(schema_id)
            .content(iceberg::spec::ManifestContentType::Data)
            .partition_spec(partition_spec)
            .format_version(FormatVersion::V2)
            .build();

        let partition = Struct::from_iter(vec![Some(Literal::Primitive(
            PrimitiveLiteral::Boolean(true),
        ))]);

        let mut column_sizes = HashMap::new();
        column_sizes.insert(0i32, 4u64);

        let mut lower_bounds = HashMap::new();
        lower_bounds.insert(0i32, Datum::bool(false));

        let mut upper_bounds = HashMap::new();
        upper_bounds.insert(1i32, Datum::bool(true));

        let tmp_dir = TempDir::new().unwrap();
        let file_path = tmp_dir.path().join("data.parquet");

        // Create an arbitrary data file.
        let data_file = DataFileBuilder::default()
            .content(iceberg::spec::DataContentType::Data)
            .file_path(file_path.to_str().unwrap().to_string())
            .file_format(iceberg::spec::DataFileFormat::Parquet)
            .column_sizes(column_sizes)
            .partition(partition)
            .equality_ids(vec![0, 1, 2, 3])
            .record_count(0)
            .file_size_in_bytes(0)
            .key_metadata(Vec::new())
            .lower_bounds(lower_bounds)
            .upper_bounds(upper_bounds)
            .build()
            .unwrap();

        // Create an arbitrary manifest entry with `snapshot_id` and `data_file`.
        let manifest_entry = ManifestEntry::builder()
            .status(ManifestStatus::Added)
            .snapshot_id(snapshot_id)
            .data_file(data_file)
            .build();

        // Create new `Manifest`
        let input_manifest = Manifest::new(metadata, vec![manifest_entry]);

        // Write the manifest to an avro file.
        let temp_dir = TempDir::new().unwrap().into_path().join("manifest.avro");
        let output_file = FileIOBuilder::new_fs_io()
            .build()
            .unwrap()
            .new_output(temp_dir.to_str().unwrap())
            .unwrap();

        let manifest_writer = ManifestWriter::new(output_file, snapshot_id, Vec::new());
        let manifest_list_entry = manifest_writer.write(input_manifest.clone()).await.unwrap();

        // Assert that the manifest path exists.
        let manifest_path = PathBuf::from(manifest_list_entry.manifest_path);
        assert!(manifest_path.exists());

        // Assert that manifest file can be parsed back into a `Manifest` struct.
        let buf = std::fs::read(&manifest_path).unwrap();
        let output_manifest = Manifest::parse_avro(&buf).unwrap();

        assert_manifest_eq(input_manifest, output_manifest);
    }

    /// Validates that two `Manifest`s are equal.
    ///
    /// The partial_eq implementation for `Manifest` seems to compare some fields structurally
    /// e
    fn assert_manifest_eq(m1: Manifest, m2: Manifest) {
        let (entries1, metadata1) = m1.into_parts();
        let (entries2, metadata2) = m2.into_parts();
        assert_eq!(entries1.len(), entries2.len());
        for (e1, e2) in entries1.into_iter().zip(entries2.into_iter()) {
            assert_manifest_entry_eq(&e1, &e2);
        }
        assert_eq!(metadata1, metadata2);
    }

    /// Validates that two `ManifestEntry`s are equal.
    ///
    /// Can't use the partial_eq implementation for `ManifestEntry` because it
    /// seems to have a bug in the equality check for partition.
    fn assert_manifest_entry_eq(e1: &ManifestEntry, e2: &ManifestEntry) {
        assert_eq!(e1.content_type(), e2.content_type());
        assert_eq!(e1.sequence_number(), e2.sequence_number());
        let file1 = e1.data_file();
        let file2 = e2.data_file();
        assert_eq!(file1.content_type(), file2.content_type());
        assert_eq!(file1.file_path(), file2.file_path());
        assert_eq!(file1.file_format(), file2.file_format());
        // seems to have a bug in the equality check for partition;
        assert_eq!(file1.partition(), file2.partition());
        assert_eq!(file1.record_count(), file2.record_count());
        assert_eq!(file1.file_size_in_bytes(), file2.file_size_in_bytes());
        assert_eq!(file1.column_sizes(), file2.column_sizes());
        assert_eq!(file1.value_counts(), file2.value_counts());
        assert_eq!(file1.null_value_counts(), file2.null_value_counts());
        assert_eq!(file1.nan_value_counts(), file2.nan_value_counts());
        assert_eq!(file1.lower_bounds(), file2.lower_bounds());
        assert_eq!(file1.upper_bounds(), file2.upper_bounds());
        assert_eq!(file1.key_metadata(), file2.key_metadata());
        assert_eq!(file1.split_offsets(), file2.split_offsets());
        assert_eq!(file1.equality_ids(), file2.equality_ids());
        assert_eq!(file1.sort_order_id(), file2.sort_order_id());
    }
}