apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
742 stars 144 forks source link

Following the memory management semantics stated in the Arrow C Data Interface Specification #885

Closed Kontinuation closed 1 week ago

Kontinuation commented 2 weeks ago

I came through a native memory leak (https://github.com/apache/datafusion-comet/issues/884) caused by the misuse of the Java Implementation of the Arrow C data interface. I'm wondering why Datafusion Comet is not following the memory management semantics stated in the Arrow C Data Interface Specification? The specification says that:

Member allocation

It is intended for the base structure to be stack- or heap-allocated by the consumer. In this case, the producer API should take a pointer to the consumer-allocated structure.

The base structures (ArrowSchema and ArrowVector) should be allocated by the consumer of Arrow vectors, the consumer passes 2 memory locations to the producer, and producer fills the base structures with pointers to the buffers allocated by the producer, as well as a release callback function for freeing up the resources allocated by the producer.

Datafusion Comet always allocates base structures in the producer and returns the base structures to the consumer. The producer may have a hard time figuring out when to deallocate the base structures: it has to happen after the consumer moves the Arrow vectors and takes ownership of them:

If we follow the memory management semantics stated by the specification, we can always free the base structures immediately after calling the producer and importing the vector, thus having a simpler model of base structure lifetime, and reducing the cost of adapting the mindset from one memory management semantics to another for contributors familiar with the Arrow C Data Interface.

viirya commented 2 weeks ago

I think it is because the query execution is always triggered from JVM side (the producer). If the array and schema structures are allocated by native, the process will become:

  1. JVM side calls native side to get array and schema structures for new batch
  2. JVM fills array and schema structures
  3. JVM calls native side again to execute the query

On the native side, the allocated base structures were kept in the ffi_arrays field of the execution context, and will be released when the next batch is produced or when the execution context is released by Native.releasePlan.

For output batch, we can provide array and schema from JVM to native when JVM calls native side to execute the query, and use them for importing. This won't change query process but have additional JNI parameters in above step 3.

viirya commented 2 weeks ago
  • On the JVM side, the lifetime of allocated base structures was not taken care of, thus causing native memory leaks.

I revised the current approach in https://github.com/apache/datafusion-comet/pull/893 for that.