Open SemyonSinchenko opened 2 months ago
Thanks for writing this up @SemyonSinchenko. Your reasoning seems sound to me, and I agree that is would be quite a unique and powerful feature for Comet. I am not sure how much work it will be, but I am happy to offer guidance (and will probably need some help from @viirya as well). I will start getting up to speed on Spark's PythonMapInArrowExec
.
I took a look on the spark code and that is what I found:
org.apache.spark.sql.execution.python.BasicPythonArrowInput
org.apache.spark.sql.execution.python.BasicPythonArrowOutput
Both are private[python]
in spark. As I can realize BaseArrowPythonRunner
expects InternalRow
as input, converts it to arrow, and returns org.apache.spark.sql.vectorized.ColumnarBatch
.
What am I thinking now is about implementing CometArrowPythonRunner extends BasePythonRunner[CometColumnarBatch, CometColumnarBatch]
. CometColumnarBatch
here is just a Scala wrapper over comet memory. There are a lot of configs that are handled by Spark, like spark.pyspark.driver.python
and spark.pyspark.python
. These configs define which python interpreter is used and also what is included to PYTHONPATH
: that allows users to add own dependencies to python vectorized UDFs. In the case when execution will be in Rust, all these configs should be handled by Comet including managing python virtual environments, collecting python metrics, etc.
So, I see a possible solution the following:
CometColumnarBatch extends InternalRow
that wraps comet data without copyCometColumnarBatch
from parent and add an ability for comet nodes to work with CometColumnarBatch
CometArrowPythonRunner extends BasePythonRunner[CometColumnarBatch, CometColumnarBatch]
with the only difference in how input and output are processedMapInBatchExecComet
that implements doExecute
and internally creates a CometArrowPythonRunner
instead of ArrowPythonRunner
MapInBatchExec
by MapInBatchExecComet
if possible and without fallback to sparkIt seems to me that re-using how spark handle python UDFs would be easier than implementing it from scratch using datafusion. But I'm not 100% sure.
It seems to me that re-using how spark handle python UDFs would be easier than implementing it from scratch using datafusion. But I'm not 100% sure.
Yes, that is the approach I would take.
Your high level plan sounds good to me.
Your high level plan sounds good to me.
Cool, thank you! I will start working on it.
@andygrove It looks like this task is very hard for me and I need more guidance.
To avoid handling all the python-related configurations in comet and reuse as much as possible from spark I need create a class that is similar to the BaseArrowPythonRunner
but instead of Iterator[InternalRow]
it should take CometExecIterator
:
That requires from me to implement the trait similar to the BasicPythonArrowInput
that implements PythonArrowInput
. It looks like I need to override only this method:
And I need to use ArrowStreamWriter
. But I failed to realize how to do it having CometExecIterator
and without copying the data... It seems to me that such a functionality should be already implemented in the org.apache.comet.vector.NativeUtils
but I failed to find it. Can you please guide me a little to the right direction? Or maybe there is a part of comet's code that I can use as an inspiration?
Also there is a big chance that I'm just going into a wrong direction...
Thanks in advance!
What is the problem the feature request solves?
Spark provide multiple ways to run arrow-backed UDFs. The current 3.5 supports
mapInArrow
, in the future 4.0 there will be alsoapplyInArrow
.My understanding of how it works in Spark under the hood is quite limited, so correct me if I'm wrong. At the moment, if Spark see in the plan
PythonMapInArrow
it will internaly do a conversion from rows to arrow-batches that should be a columnar representation of the data: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInBatchExec.scala#L36That is a minimal example of running
mapInArrow
in Spark 3.4:If I try to run it with Comet enabled it will generate the following physical plan:
If I understand it right, the following happens:
PythonMapInArrow
It seems to me that points 2-3 are redundant and the arrow batches that are required for
mapInArrow
can be created directly from the Comet arrow-backed columns and this operation should be a kind of zero-copy... And actually the back conversion from spark columnar batch to comet columnar batch may zero-copy too, so in theory Comet does not need to make a fallback to spark in this case, right?Describe the potential solution
I do no know an exact solution. It is mostly a question.
Additional context
I'm willing to implement it by myself, I'm ready to work on it. But I need a guidance and help with an overall design of how it should be done (if it is feasible).
The native support of arrow-backed UDFs opens a lot of new cool ways of using Comet. I see that it can gain a huge boost for most of ML/MLOps tasks that are typically done in Spark via arrow-backed UDFs (
pandas
,polars
,pyarrow
or even rust code built withmaturin
to a python-module).