apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.26k stars 3.47k forks source link

IndexOutOfBoundsException when loading compressed IPC format #33384

Open asfimport opened 1 year ago

asfimport commented 1 year ago

I encountered this bug when I loaded a dataframe stored in the Arrow IPC format.

 


// Java Code from "Apache Arrow Java Cookbook"
File file = new File("example.arrow");
try (
        BufferAllocator rootAllocator = new RootAllocator();
        FileInputStream fileInputStream = new FileInputStream(file);
        ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
) {
    System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
    for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
        reader.loadRecordBatch(arrowBlock);
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
} 

Call stack:


Exception in thread "main" java.lang.IndexOutOfBoundsException: index: 0, length: 2048 (expected: range(0, 2024))
    at org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:701)
    at org.apache.arrow.memory.ArrowBuf.setBytes(ArrowBuf.java:955)
    at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:451)
    at org.apache.arrow.vector.BaseFixedWidthVector.setValueCount(BaseFixedWidthVector.java:732)
    at org.apache.arrow.vector.VectorSchemaRoot.setRowCount(VectorSchemaRoot.java:240)
    at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:86)
    at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:220)
    at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:166)
    at org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:197)

This bug can be reproduced by a simple dataframe created by pandas:

 


pd.DataFrame({'a': range(10000)}).to_feather('example.arrow') 

Pandas compresses the dataframe by default. If the compression is turned off, Java can load the dataframe. Thus, I guess the bounds checking code is buggy when loading compressed file.

 

That dataframe can be loaded in polars, pandas and pyarrow, so it's unlikely to be a pandas bug.

 

 

Environment: Linux and Windows. Apache Arrow Java version: 10.0.0, 9.0.0, 4.0.1. Pandas 1.4.2 using pyarrow 8.0.0 (anaconda3-2022.05) Reporter: Georeth Zhou

Note: This issue was originally created as ARROW-18198. Please see the migration documentation for further details.

asfimport commented 1 year ago

David Li / @lidavidm: CC @davisusanibar

asfimport commented 1 year ago

Georeth Zhou: any updates?

asfimport commented 1 year ago

David Dali Susanibar Arce / @davisusanibar: Hi [~georeth] let me check that

asfimport commented 1 year ago

David Dali Susanibar Arce / @davisusanibar: There isn't problem for binary file with less than rowCount <= 2048.

There is a problem with the Validity Buffer, for example for 2049 rows initially there is assigned 504 buffer size, but at the end is requested 512 length size.

Need to continue reviewing for changes needed.

 

asfimport commented 1 year ago

David Dali Susanibar Arce / @davisusanibar:  

Base on the current implementation the default compression codec is no compression.

 

 

 

asfimport commented 1 year ago

David Dali Susanibar Arce / @davisusanibar: @lidavidm please if you help me with this doubt:

Vector module was designed to support Compression codec (Lz4/Zstd)? Because I only see abstract class AbstractCompressionCodec, then doDecompress is only implemented on Compression module and if I try to used that this will cause cyclic dependency Vector <–> Compression.

 

Could you help us about a way to implement compression on Vector module?

asfimport commented 1 year ago

David Li / @lidavidm: @davisusanibar I don't see the problem? Compression is implemented. Just add dependencies on both modules from your application.

In any case, the first issue here is that Java should detect the file is compressed and error if it doesn't support the codec.

asfimport commented 1 year ago

David Li / @lidavidm: The ArrowFileReader/StreamReader take in an optional codec factory instance, so that's probably the underlying issue (the modules are decoupled so by default you can't read a compressed file), but we should still fix the error message when you don't pass in the factory.

asfimport commented 1 year ago

David Dali Susanibar Arce / @davisusanibar: Hi [~georeth] ,

Please consider this PR to add cookbook for read compressed files.


File file = new File("src/main/resources/compare/lz4.arrow");
try (
    BufferAllocator rootAllocator = new RootAllocator();
    FileInputStream fileInputStream = new FileInputStream(file);
    // ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator): Use CommonsCompressionFactory for compressed files
    ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(),
        rootAllocator, CommonsCompressionFactory.INSTANCE)
) {
    System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
    for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
        reader.loadRecordBatch(arrowBlock);
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.println("Size: --> " + vectorSchemaRootRecover.getRowCount());
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
    }
} catch (IOException e) {
    e.printStackTrace();
} 
asfimport commented 1 year ago

Georeth Zhou: @davisusanibar thank you.

It works now.