apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
13.87k stars 3.38k forks source link

[Java] MapVector cannot be loaded via IPC #42218

Open vibhatha opened 1 week ago

vibhatha commented 1 week ago

Describe the bug, including details regarding any error messages, version, and platform.

Referring to the stackoverflow filed issue: https://stackoverflow.com/questions/77878272/apache-arrow-not-all-nodes-and-buffers-were-consumed-error-when-writing-a-map

The following code would yield an error;

File file = new File("test.arrow");

    Field keyField = new Field("id", FieldType.notNullable(new ArrowType.Int(64, true)),
        Collections.emptyList());
    Field valueField = new Field("value", FieldType.nullable(new ArrowType.Int(64, true)), Collections.emptyList());
    Field structField =
        new Field("entry", FieldType.notNullable(ArrowType.Struct.INSTANCE), List.of(keyField, valueField));
    Field mapIntToIntField = new Field("mapFieldIntToInt", FieldType.notNullable(new ArrowType.Map(false)), List.of(structField));

    Schema schema = new Schema(Arrays.asList(mapIntToIntField));

    System.out.println("Writing...");

    try (BufferAllocator allocator = new RootAllocator()) {
      try (
          VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, allocator);
          MapVector mapVector = (MapVector) vectorSchemaRoot.getVector("mapFieldIntToInt")) {
        UnionMapWriter mapWriter = mapVector.getWriter();
        mapWriter.setPosition(0);
        mapWriter.startMap();
        for (int i = 0; i < 3; i++) {
          mapWriter.startEntry();
          mapWriter.key().bigInt().writeBigInt(i);
          mapWriter.value().bigInt().writeBigInt(i * 7);
          mapWriter.endEntry();
        }
        mapWriter.endMap();
        mapWriter.setValueCount(1);
        vectorSchemaRoot.setRowCount(1);

        System.out.println(vectorSchemaRoot.getFieldVectors().size());
        System.out.println("vectorSchemaRoot.getVector(0): " + vectorSchemaRoot.getVector(0));

        try (
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            ArrowFileWriter writer = new ArrowFileWriter(vectorSchemaRoot, null, fileOutputStream.getChannel())) {
          writer.start();
          writer.writeBatch();
          writer.end();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
    }

    System.out.println("Reading...");

    try(
        BufferAllocator rootAllocator = new RootAllocator();
        FileInputStream fileInputStream = new FileInputStream(file);
        ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), rootAllocator)
    ){
      System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
      for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
        boolean loaded = reader.loadRecordBatch(arrowBlock);
        System.out.println(loaded);
        VectorSchemaRoot vectorSchemaRootRecover = reader.getVectorSchemaRoot();
        System.out.print(vectorSchemaRootRecover.contentToTSVString());
      }
    } catch (IOException e) {
      e.printStackTrace();
    }

Error

Exception in thread "main" java.lang.IllegalArgumentException: not all nodes, buffers and variadicBufferCounts were consumed. nodes: [ArrowFieldNode [length=3, nullCount=0]] buffers: [ArrowBuf[24], address:123230812873128, capacity:1, ArrowBuf[25], address:123230812873136, capacity:24] variadicBufferCounts: []
    at org.apache.arrow.vector.VectorLoader.load(VectorLoader.java:98)
    at org.apache.arrow.vector.ipc.ArrowReader.loadRecordBatch(ArrowReader.java:214)
    at org.apache.arrow.vector.ipc.ArrowFileReader.loadNextBatch(ArrowFileReader.java:166)
    at org.apache.arrow.vector.ipc.ArrowFileReader.loadRecordBatch(ArrowFileReader.java:192)

Component(s)

Java

vibhatha commented 1 week ago

It seems that the validity buffer of the key is not properly written. It is all null.

llama90 commented 1 week ago

@vibhatha Is this unrelated to this comment?

UPDATE:

I don't think it's related to them. I will create an issue and resolve it.

vibhatha commented 1 week ago

@llama90 this is a very older issue which I am trying to solve.

vibhatha commented 1 week ago

@lidavidm a question:

Field keyField = new Field("id", FieldType.notNullable(new ArrowType.Int(64, true)),
        Collections.emptyList());
Field valueField = new Field("value", FieldType.nullable(new ArrowType.Int(64, true)), Collections.emptyList());
Field structField =
     new Field("entry", FieldType.notNullable(ArrowType.Struct.INSTANCE), List.of(keyField, valueField));
Field mapIntToIntField = new Field("mapFieldIntToInt", FieldType.notNullable(new ArrowType.Map(false)), List.of(structField));

After debugging this is what I think is happening. We have given the key field a name id and the value field a name value. When we try to write to vectors, there are already 2 vectors for the StructVector (within MapVector) two children i.e. mapVector.getChildrenFromFields().get(0).getChildrenFromFields().get(0).getField() -> key: Int(64, true) not null and mapVector.getChildrenFromFields().get(0).getChildrenFromFields().get(1).getField() -> value: Int(64, true).

But when we go for writing data

@Override
  public BigIntWriter bigInt() {
    switch (mode) {
      case KEY:
        return entryWriter.bigInt(MapVector.KEY_NAME);
      case VALUE:
        return entryWriter.bigInt(MapVector.VALUE_NAME);
      default:
        return this;
    }
  }

This is the regular check we have, and these KEY_NAME and VALUE_NAME are hardcoded as key and value respectively. They are not being updated by looking into the given struct. Thus at writing time, it introduces an additional vector with id name, and that make is not consume the key. At least this is what is happening in highlevel. If I rename id to key the code works.

In the reading part, it has an incorrect schema. Worse case is, we can get the schema from the vector itself, let's say. Then again we have 2 idle vectors in case users use different names. Shouldn't we update the KEY_NAME and VALUE_NAME properly? Or Am I misreading this?

lidavidm commented 1 week ago

We should get it from the vector, yes. They are recommended to be "key" and "value" but it is not meant to be required

https://github.com/apache/arrow/blob/d28078d80e7fe7ff93e4b8a9a331ec4ba4648bf1/format/Schema.fbs#L126-L129

vibhatha commented 1 week ago

So fix it? Or enforce the key,value usage?

lidavidm commented 1 week ago

Fix it, the spec says explicitly not to enforce key/value