Open zhztheplayer opened 4 weeks ago
Do you mean we can allocate the batch in Spark's offheap memory? if so it's a good solution.
CC @kecookier @NEUpanning
Do you mean we can allocate the batch in Spark's offheap memory? if so it's a good solution.
Correct, I assume it could be straightforward to simply change the allocation to off-heap. It's not likely similar with Gazelle's case which used to be tricky.
Hi, I would like to do this ticket to learn more about Gluten. Could you please point me to where an off-heap allocated container is used in the code? Thank you.
Thank you in advance for helping @Zand100 .
You can refer to vanilla Spark's code UnsafeHashedRelation where BytesToBytesMap
can be allocated from off-heap. Probably we can adopt a similar approach in Gluten.
So I can assign this ticket to you, I suppose?
@Zand100 You may consider to upstream the PR to upstream spark as well. It's another solution about offheap/onheap conflict, that we move all Spark's large memory allocation to offheap once offheap is enabled.
Thank you!
Just to check I'm on the right track, I'm trying to use the BytesToBytesMap
instead of Array[Array[Bytes]]
in https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala#L39. Is that right?
Yes, you can assign the ticket to me.
Hi, how should I handle constructing a ColumnarBuildSideRelation
for the HashedRelationBroadcastMode
case?
val fromBroadcast = from.asInstanceOf[Broadcast[HashedRelation]]
val fromRelation = fromBroadcast.value.asReadOnlyCopy()
If fromBroadcast
is already an UnsafeHashedRelation
, then it's already using a BytesToBytesMap
, which is perfect. But if fromBroadcast
is LongHashedRelation
or GeneralHashedRelation
, then should I convert it to UnsafeHashedRelation
?
Could you please review the draft https://github.com/apache/incubator-gluten/pull/7885?
@Zand100
Not necessarily to use BytesToBytesMap
in Gluten if it's not for our use case. We can create another type of binary container implementing a Spark MemoryConsumer
per our own demand.
Thank you! Could you please review the draft of the new binary container? https://github.com/apache/incubator-gluten/pull/7902
Could you please review https://github.com/apache/incubator-gluten/pull/7944 ? Should ColumnarBuildSideRelation
still use the object handle (for example https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala#L67) and construct the ColumnarBatch
(I think that becomes on-heap again)?
We are very glad to see the discussion here. In our production environment, we have also been troubled by the out-of-memory (OOM) problem caused by the broadcast build relation using the heap memory for a long time. We adopted a similar approach as proposed by @zhztheplayer and made more optimizations (such as dividing large batches into small batches) in our production scenario.
We would like to share our approach and contribute it back in the following weeks.
batches: Array[Array[Byte]]
.Array[Array[Byte]]
still remains in on-heap memory. In some extreme situations, this will consume a large amount of heap memory. Moreover, the batch in batches: Array[Array[Byte]]
needs to be copied to off-heap memory through JNI, which will also be a waste of CPU resources and memory.It went through two rounds of iterative development in our inner environment.
BytesArrayInOffheap
to store broadcasted data in offheap memory.However, it is obvious that there is room for improvement to avoid the extra copying between on-heap and off-heap memory. Additionally, another problem emerges where a certain batch in batches: Array[Array[Byte]]
can be extremely large, which usually leads to out-of-memory (OOM) in off-heap memory. Consequently, we carried out our second round of optimization.
batches: Array[Array[Byte]]
BroadcastUtils#serializeStream
, support split batches: Iterator[ColumnarBatch]
into more small batchesBytesArrayInOffheap
memory address and size to underlying deserializer.We will introudce a config in GlutenConfig which controls whether use offheap to store the broadcast data. And when all related code get merged and after all things be done, we can remove the config and make this as default behavior.
Briefly Steps:
BytesArrayInOffheap
、 UnsafeColumnarBuildSideRelation
and spark.gluten.sql.BroadcastBuildRelationUseOffheap.enabled
in GlutenConfig to Implement what we have done in Step1BroadcastUtils#serializeStream
and refactor the deserialize/serialize the process
cc @WangGuangxin @weiting-chen
So far
ColumnarBuildSideRelation
is allocated on Spark JVM heap memory.It appears that we can replace
batches: Array[Array[Byte]]
with an off-heap allocated container to move the memory usage to off-heap. There should be a simple solution that doesn't require too much of refactor.This could avoid some of the heap OOM issues.