apache / arrow-rs

Official Rust implementation of Apache Arrow
https://arrow.apache.org/
Apache License 2.0
2.5k stars 745 forks source link

Error in IPC serializer or reader when using dictionary columns #6221

Open adriangb opened 1 month ago

adriangb commented 1 month ago

use std::sync::Arc;

use arrow::array::{DictionaryArray, Int32Array, Int8Array, LargeStringArray, RecordBatch, StringArray};
use arrow_ipc::{reader::StreamReader, writer::{IpcWriteOptions, StreamWriter}};
use arrow_schema::{DataType, Field, Schema};
use bytes::{BytesMut, BufMut, Buf};

#[tokio::main]
async fn main() {
    let schema = Arc::new(
        Schema::new(
            vec![
                Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::LargeUtf8)), false),
                Field::new("b", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), false),
            ]
        )
    );

    let a_array = DictionaryArray::new(
        Int32Array::from(vec![Some(0)]),
        Arc::new(LargeStringArray::from(vec![Some("hello")])),
    );
    let b_array = DictionaryArray::new(
        Int8Array::from(vec![Some(0)]),
        Arc::new(StringArray::from(vec![Some("world")])),
    );

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(a_array),
            Arc::new(b_array),
        ],
    ).unwrap();

    for indices in vec![vec![0], vec![1], vec![0, 1]] {
        println!("indices: {:?}", indices);
        let batch = batch.project(&indices).unwrap();
        let options = IpcWriteOptions::default();
        let schema = Arc::unwrap_or_clone(batch.schema());
        let mut written_bytes = BytesMut::new().writer();
        {
            let mut writer = StreamWriter::try_new_with_options(&mut written_bytes, &schema, options).unwrap();
            writer.write(&batch).unwrap();
            writer.finish().unwrap();
        }
        let written_bytes = written_bytes.into_inner().freeze();

        let reader = StreamReader::try_new(written_bytes.reader(), None).unwrap();
        reader.collect::<Result<Vec<_>, _>>().unwrap();
    }
}
indices: [0]
indices: [1]
indices: [0, 1]
thread 'main' panicked at src/bin/demo.rs:51:47:
called `Result::unwrap()` on an `Err` value: InvalidArgumentError("Buffer 0 of LargeUtf8 isn't large enough. Expected 16 bytes got 8")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

If you do just one column at a time, it works 🤔

adriangb commented 1 month ago

Using the file IPC format I get a write error:


use std::sync::Arc;

use arrow::array::{DictionaryArray, Int32Array, Int8Array, LargeStringArray, RecordBatch, StringArray};
use arrow_ipc::{reader::FileReader, writer::{FileWriter, IpcWriteOptions}};
use arrow_schema::{DataType, Field, Schema};
use bytes::{BytesMut, BufMut};

#[tokio::main]
async fn main() {
    let schema = Arc::new(
        Schema::new(
            vec![
                Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::LargeUtf8)), false),
                Field::new("b", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), false),
            ]
        )
    );

    let a_array = DictionaryArray::new(
        Int32Array::from(vec![Some(0)]),
        Arc::new(LargeStringArray::from(vec![Some("hello")])),
    );
    let b_array = DictionaryArray::new(
        Int8Array::from(vec![Some(0)]),
        Arc::new(StringArray::from(vec![Some("world")])),
    );

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(a_array),
            Arc::new(b_array),
        ],
    ).unwrap();

    for indices in vec![vec![0], vec![1], vec![0, 1]] {
        println!("indices: {:?}", indices);
        let batch = batch.project(&indices).unwrap();
        let options = IpcWriteOptions::default();
        let schema = Arc::unwrap_or_clone(batch.schema());
        let mut written_bytes: bytes::buf::Writer<BytesMut> = BytesMut::new().writer();
        {
            let mut writer = FileWriter::try_new_with_options(&mut written_bytes, &schema, options).unwrap();
            writer.write(&batch).unwrap();
            writer.finish().unwrap();
        }
        let written_bytes = written_bytes.into_inner().freeze();

        let cursor = std::io::Cursor::new(written_bytes);
        let reader = FileReader::try_new(cursor, None).unwrap();
        reader.collect::<Result<Vec<_>, _>>().unwrap();
    }
}
indices: [0]
indices: [1]
indices: [0, 1]
thread 'main' panicked at src/bin/demo.rs:45:34:
called `Result::unwrap()` on an `Err` value: InvalidArgumentError("Dictionary replacement detected when writing IPC file format. Arrow IPC files only support a single dictionary for a given field across all batches.")

Reading / writing the same data to Parquet works without issues:


use std::sync::Arc;

use arrow::array::{DictionaryArray, Int32Array, Int8Array, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use bytes::{BytesMut, BufMut};
use parquet::arrow::{arrow_reader::ArrowReaderBuilder, arrow_writer::ArrowWriterOptions, ArrowWriter};

#[tokio::main]
async fn main() {
    let schema = Arc::new(
        Schema::new(
            vec![
                Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::LargeUtf8)), false),
                Field::new("b", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), false),
            ]
        )
    );

    let a_array = DictionaryArray::new(
        Int32Array::from(vec![Some(0)]),
        Arc::new(LargeStringArray::from(vec![Some("hello")])),
    );
    let b_array = DictionaryArray::new(
        Int8Array::from(vec![Some(0)]),
        Arc::new(StringArray::from(vec![Some("world")])),
    );

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(a_array),
            Arc::new(b_array),
        ],
    ).unwrap();

    for indices in vec![vec![0], vec![1], vec![0, 1]] {
        println!("indices: {:?}", indices);
        let batch = batch.project(&indices).unwrap();
        let options = ArrowWriterOptions::default();
        let schema = batch.schema();
        let mut written_bytes: bytes::buf::Writer<BytesMut> = BytesMut::new().writer();
        {
            let mut writer = ArrowWriter::try_new_with_options(&mut written_bytes, schema, options).unwrap();
            writer.write(&batch).unwrap();
            writer.finish().unwrap();
        }
        let written_bytes = written_bytes.into_inner().freeze();

        let reader = ArrowReaderBuilder::try_new(written_bytes).unwrap().build().unwrap();
        reader.collect::<Result<Vec<_>, _>>().unwrap();
    }
}

Let me know if I'm doing something wrong...

adriangb commented 1 month ago

Also if I replace the LargeUtf8Array with Utf8 it works:

use std::sync::Arc;

use arrow::array::{DictionaryArray, Int32Array, Int8Array, RecordBatch, StringArray};
use arrow_ipc::{reader::StreamReader, writer::{IpcWriteOptions, StreamWriter}};
use arrow_schema::{DataType, Field, Schema};
use bytes::{BytesMut, BufMut, Buf};

#[tokio::main]
async fn main() {
    let schema = Arc::new(
        Schema::new(
            vec![
                Field::new("a", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), false),
                Field::new("b", DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)), false),
            ]
        )
    );

    let a_array = DictionaryArray::new(
        Int32Array::from(vec![Some(0)]),
        Arc::new(StringArray::from(vec![Some("hello")])),
    );
    let b_array = DictionaryArray::new(
        Int8Array::from(vec![Some(0)]),
        Arc::new(StringArray::from(vec![Some("world")])),
    );

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![
            Arc::new(a_array),
            Arc::new(b_array),
        ],
    ).unwrap();

    for indices in vec![vec![0], vec![1], vec![0, 1]] {
        println!("indices: {:?}", indices);
        let batch = batch.project(&indices).unwrap();
        let options = IpcWriteOptions::default();
        let schema = Arc::unwrap_or_clone(batch.schema());
        let mut written_bytes = BytesMut::new().writer();
        {
            let mut writer = StreamWriter::try_new_with_options(&mut written_bytes, &schema, options).unwrap();
            writer.write(&batch).unwrap();
            writer.finish().unwrap();
        }
        let written_bytes = written_bytes.into_inner().freeze();

        let reader = StreamReader::try_new(written_bytes.reader(), None).unwrap();
        reader.collect::<Result<Vec<_>, _>>().unwrap();
    }
}