jorgecarleitao / arrow2

Transmute-free Rust library to work with the Arrow format
Apache License 2.0
1.07k stars 221 forks source link

Append to existing ipc file results in ErrorLocation: InvalidOffset when reading new block #1604

Open Jonarod opened 5 months ago

Jonarod commented 5 months ago

Trying to append to an existing ipc file as per: arrow2::io::ipc::write::FileWriter::try_from_file(), but even though it seems to accurately write the necessary bookkeeping (metadata are correct) and also writing the new chunk of data (total file size grows accordingly), it seems that the block of written data is corrupt somehow, and cannot be read.

Here are reproducible steps:

Cargo.toml:

[dependencies]
arrow2 = { version = "0.18.0", features = ["io_ipc"]}

then in main.rs:

use std::fs::File;
use arrow2::io::ipc::{
  self,
  write::WriteOptions
};
use arrow2::array::PrimitiveArray;
use arrow2::datatypes::{Field, DataType, Schema};
use arrow2::chunk::Chunk;

fn main(){

  // =====================================================
  // Create new ipc file
  // =====================================================
  let filepath = "./test_arrow_append.ipc";

  let fields = vec![
    Field::new(String::from("col"), DataType::Float64, false),
  ];
  let schema = Schema::from(fields);

  let file = File::create(&filepath).unwrap();
  let options = WriteOptions { 
    compression: None
  };
  let mut writer = ipc::write::FileWriter::new(&file, schema, None, options);

  let col = PrimitiveArray::from_vec(vec![1.0]);

  writer.start().unwrap();
  writer.write(&Chunk::new(vec![Box::new(col)]), None).unwrap();
  writer.finish().unwrap();

  // =====================================================
  // Prove it is a valid file we can read from
  // =====================================================
  if let Ok(file) = File::open(&filepath) {
    let metadata = ipc::read::read_file_metadata(&mut File::open(filepath).unwrap()).unwrap().clone();
    let mut reader = ipc::read::FileReader::new(&file, metadata, None, None);
    println!("Contains {:?} block(s)", reader.metadata().blocks.len());
    for block in &reader.metadata().blocks {
      println!("{:#?}", block);
    }

   // Can read block 1:
    println!("Block 1 is: {:#?}", reader.nth(0));
  }

  // =====================================================
  // Now try to append to it
  // =====================================================
  let metadata = ipc::read::read_file_metadata(&mut File::open(filepath).unwrap()).unwrap().clone();
  let file_append_mode = File::options().append(true).open("./test_polars_arrow.ipc").unwrap();
  let options = WriteOptions { 
    compression: None
  };
  let mut writer = ipc::write::FileWriter::try_from_file(file_append_mode, metadata, options).unwrap();

  let col = PrimitiveArray::from_vec(vec![2.0]);

  writer.write(&Chunk::new(vec![Box::new(col)]), None).unwrap();
  writer.finish().unwrap();

  // =====================================================
  // Prove it is a valid file we can read from
  // =====================================================
  if let Ok(file) = File::open(&filepath) {
    let metadata = ipc::read::read_file_metadata(&mut File::open(filepath).unwrap()).unwrap().clone();
    let mut reader = ipc::read::FileReader::new(&file, metadata, None, None);
    println!("Contains {:?} block(s)", reader.metadata().blocks.len());
    for block in &reader.metadata().blocks {
      println!("{:#?}", block);
    }

   // Can read block 1:
    println!("Block 1 is: {:#?}", reader.nth(0));

   // CANNOT READ BLOCK 2: prints None
   // println!("Block 2 is: {:#?}", reader.nth(1));
    println!("Block 2 is: {:#?}", reader.next());
    // Some(Err(OutOfSpec("InvalidFlatbufferMessage(Error { source_location: ErrorLocation { type_: \"[MessageRef]\", method: \"read_as_root\", byte_offset: 0 }, error_kind: InvalidOffset })")))

  }

}