apache / iceberg-rust

Apache Iceberg
https://rust.iceberg.apache.org/
Apache License 2.0
474 stars 97 forks source link

Implement the equality delete writer #341

Open ZENOTME opened 2 months ago

ZENOTME commented 2 months ago

After we finish https://github.com/apache/iceberg-rust/pull/275, we can implement the equality delete writer based on this framework.

There is a rust implementation that can be referred to in icelake. But better design is acceptable.

related spec: https://iceberg.apache.org/spec/#equality-delete-files

Dysprosium0626 commented 2 months ago

Hi @ZENOTME, Maybe I can take this issue after you complete https://github.com/apache/iceberg-rust/issues/345

ZENOTME commented 2 months ago

Hi @ZENOTME, Maybe I can take this issue after you complete #345

Sure! Thanks!

liurenjie1024 commented 2 months ago

Assigned to you, thanks @Dysprosium0626 !

Dysprosium0626 commented 2 months ago

Hi I nearly complete adding EqualityDeleteWriter but I encounter some problem. My impl is here: https://github.com/Dysprosium0626/iceberg-rust/blob/add_equality_delete_writer/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Basically, in my test case, I write some schema to build up a ParquetWriterBuilder and pass it into EqualityDeleteFileWriterBuilder.

        // prepare writer
        let pb = ParquetWriterBuilder::new(
            WriterProperties::builder().build(),
            to_write.schema(),
            file_io.clone(),
            location_gen,
            file_name_gen,
        );
        let equality_ids = vec![1, 3];
        let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
            .build(EqualityDeleteWriterConfig::new(
                equality_ids,
                schema.clone(),
                PARQUET_FIELD_ID_META_KEY,
            ))
            .await?;

The FieldProjector will filter columns in schema by the equality_ids and I tried to generate a delete_schema with fields after projection.

    async fn build(self, config: Self::C) -> Result<Self::R> {
        let (projector, fields) = FieldProjector::new(
            config.schema.fields(),
            &config.equality_ids,
            &config.column_id_meta_key,
        )?;
        let delete_schema = Arc::new(arrow_schema::Schema::new(fields));
        Ok(EqualityDeleteFileWriter {
            inner_writer: Some(self.inner.clone().build().await?),
            projector,
            delete_schema,
            equality_ids: config.equality_ids,
        })
    }

The problem is I cannot pass the delete_schema to FileWriterBuilder(ParquetWriterBuilder in this case), and the schema for inner writer is the old version(without projection), so the inner writer canno write file with properly. Do you have any ideas? @ZENOTME

ZENOTME commented 1 month ago

Thanks! @Dysprosium0626 Sorry for replying late. Our original idea here is to construct the delete schema outside the EqualityDeleteFileWriter.

 let equality_ids = vec![1, 3];
 let delete_schema = ...;
 let pb = ParquetWriterBuilder::new(
            WriterProperties::builder().build(),
            delete_schema,
            file_io.clone(),
            location_gen,
            file_name_gen,
 );
 let mut equality_delete_writer = EqualityDeleteFileWriterBuilder::new(pb)
          .build(EqualityDeleteWriterConfig::new(
                equality_ids,
                PARQUET_FIELD_ID_META_KEY,
            ))
            .await?;

Looks like the schema always can be determined before we build the writer rather than "run time".