apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6k stars 1.14k forks source link

Support `Dictionary` in Parquet Metadata Statistics #11145

Closed appletreeisyellow closed 3 months ago

appletreeisyellow commented 3 months ago

Describe the bug

When a column has data type in Dictionary, the parquet metadata statistics returns Exact(Dictionary(Int32, Utf8(NULL))) for min and max values

To Reproduce

Run the test below in this file: https://github.com/apache/datafusion/blob/8216e32e87b2238d8814fe16215c8770d6c327c8/datafusion/core/src/datasource/file_format/parquet.rs#L1363

    #[tokio::test]
    async fn test_statistics_from_parquet_metadat_dictionary() -> Result<()> {
        // Data for column c_dic: ["a", "b", "c", "d"]
        let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
        let keys = Int32Array::from_iter_values([0, 0, 1, 2]);
        let dic_array =
            DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values)).unwrap();
        let boxed_array: Box<dyn arrow_array::Array> = Box::new(dic_array);
        let c_dic: ArrayRef = Arc::from(boxed_array);
        // Define the schema
        let field = Field::new(
            "c_dic",
            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
            false,
        );
        let schema = Schema::new(vec![field]);
        // Create the RecordBatch
        let batch1 = RecordBatch::try_new(Arc::new(schema), vec![c_dic]).unwrap();

        // Use store_parquet to write each batch to its own file
        // . batch1 written into first file and includes:
        //    - column c_dic that has 4 rows with no null. Stats min and max of string column is missing for this test even the column has values
        let store = Arc::new(LocalFileSystem::new()) as _;
        let (files, _file_names) = store_parquet(vec![batch1], false).await?;

        let state = SessionContext::new().state();
        let format = ParquetFormat::default();
        let schema = format.infer_schema(&state, &store, &files).await.unwrap();

        // Fetch statistics for first file
        let pq_meta = fetch_parquet_metadata(store.as_ref(), &files[0], None).await?;
        let stats = statistics_from_parquet_meta(&pq_meta, schema.clone()).await?;
        assert_eq!(stats.num_rows, Precision::Exact(4));

        // column c_dic
        let c_dic_stats = &stats.column_statistics[0];

        let null_dic: ScalarValue = ScalarValue::Dictionary(
            Box::new(DataType::Int32),
            Box::new(ScalarValue::Utf8(None)),
        );

        assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
        // BUG:
        // Expect:
        //   Exact(Dictionary(Int32, Utf8("a")))
        // Got:
        //   Exact(Dictionary(Int32, Utf8(NULL)))
        assert_eq!(c_dic_stats.max_value, Precision::Exact(null_dic.clone()));
        // BUG:
        // Expect:
        //   Exact(Dictionary(Int32, Utf8("d")))
        // Got:
        //   Exact(Dictionary(Int32, Utf8(NULL)))
        assert_eq!(c_dic_stats.min_value, Precision::Exact(null_dic.clone()));

        Ok(())
    }

Expected behavior

Expect statistics to show the min and max values. For the reproducer given above, I'm expecting to get:

Additional context

The underlying statistics extraction code should have no problems extracting statistics from Dictionary columns

The code is

https://github.com/apache/datafusion/blob/7e49ccf3dd3408bc9c4adb86f070d1e3d1f4c1e2/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L452-L454

And the tests are here:

https://github.com/apache/datafusion/blob/7e49ccf3dd3408bc9c4adb86f070d1e3d1f4c1e2/datafusion/core/tests/parquet/arrow_statistics.rs#L1729-L1768

I wonder if something about the code that summarizes the statistics across row groups https://github.com/apache/datafusion/blob/7e49ccf3dd3408bc9c4adb86f070d1e3d1f4c1e2/datafusion/core/src/datasource/file_format/parquet.rs#L468-L495

doesn't handle dictionaries correctly 🤔

efredine commented 3 months ago

Yes - the code summarizing the max and min isn't working correctly for a Dictionary. In the test case, the max_value or min_value in a StringArray that needs to be mapped to the appropriate Dictionary type before being passed into the update_batch methods. I will have a go at fixing it but can't do that until tomorrow morning so someone else should be feel free to pick it up if they need a fix before then.

efredine commented 3 months ago

In fact, it seems to me that the mapping to the correct dictionary type should probably be performed here? https://github.com/apache/datafusion/blob/7e49ccf3dd3408bc9c4adb86f070d1e3d1f4c1e2/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs#L452-L454

efredine commented 3 months ago

Well, I don't think it can be easily modified at the source and that maybe isn't the right thing to do. So probably best to just address it in summarize_min_max_null_counts.

efredine commented 3 months ago

take