facebookincubator / velox

A C++ vectorized database acceleration library aimed to optimizing query engines and data processing systems.
https://velox-lib.io/
Apache License 2.0
3.4k stars 1.11k forks source link

Enhance AggregationFuzzer to verify results against Spark #9270

Open rui-mo opened 5 months ago

rui-mo commented 5 months ago

Description

Currently, Aggregation Fuzzer verifies results against DuckDB. However, not all functions are available in DuckDB and sometimes semantics don't match. It would be better to verify against Spark.

After several rounds of investigation, we would like to implement the SparkQueryRunner based on Spark Connect. In Spark 3.4, Spark Connect introduced a decoupled client-server architecture for Spark that allows remote connectivity to Spark clusters as described in spark-connect-overview. From the client perspective, Spark Connect mostly behaves as any other gRPC client, which is polyglot and and cross-platforms. Protocols used by Spark Connect are proto files defined in https://github.com/apache/spark/tree/master/connector/connect/common/src/main/protobuf/spark/connect.

Start Spark Connect server Firstly, we need to deploy an executable Spark, and download spark-connect_2.12-3.5.1.jar from https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.1/. Then in $SPARK_HOME, we can start Spark Connect server with below command. ./sbin/start-connect-server.sh --jars $SPARK_HOME/jars/spark-connect_2.12-3.5.1.jar If the sever is started successfully, we can see log as below. INFO SparkConnectServer: Spark Connect server started at: 0:0:0:0:0:0:0:0%0:15002

Work with Spark Connect to submit query and get the result Below diagram illustrates how query is submitted from native to Spark through Spark Connect for execution. Firstly, we create a protobuf message ExecutePlanRequest from a string query based on defined protocols. Then we submit the message to a gRPC API ExecutePlan for execution, and result can be read from its response. Since Spark stores data in Arrow IPC stream format, arrow::ipc::RecordBatchReader is used to read bytes as Arrow RecordBatch. By converting Arrow RecordBatch as Velox vector, we can compare the results of Spark and Velox. We have implemented a prototype SparkClient.cpp and verified its functionality. It could submit a query to Spark and fetch the results back to native.

Untitled Diagram

rui-mo commented 5 months ago

https://github.com/facebookincubator/velox/issues/6595#issuecomment-1829141664

rui-mo commented 5 months ago

Let's track the updates of SparkQueryRunner here. cc: @mbasmanova @zhztheplayer

mbasmanova commented 5 months ago

CC: @kgpai @kagamiori

mbasmanova commented 5 months ago

CC: @amitkdutta

rui-mo commented 4 months ago

Hi @mbasmanova, I have updated our recent updates in this issue. Your feedback is appreciated, thanks. cc: @FelixYBW

mbasmanova commented 4 months ago

@rui-mo Rui, this is great. Looks like you have a working prototype. What would be the next steps towards "productizing" this?

CC: @kgpai @assignUser @duanmeng

rui-mo commented 4 months ago

@mbasmanova If this approach makes sense in Velox, we plan to introduce the Spark client as well as the Spark connect protocols into Velox, and start to enhance the aggregation fuzzer based on them. To test against Spark, we would like to know where we can set-up the Spark environment needed for execution, thanks.

assignUser commented 4 months ago

where we can set-up the Spark environment needed for execution

What's the requirement for this? I assume this would be more inline of an ephemeral test setup (e.g. how we use hadoop on the adapters test) and not a permanent production grade thing?

We could add spark, spark connect and deps to the adapers docker image. I think it's also possible to run additional containers as serviced in a github action job but I haven't used that feature yet.

rui-mo commented 4 months ago

I assume this would be more inline of an ephemeral test setup

@assignUser Yes, you are right. Could you provide us a reference on how hadoop or presto is deployed in the test? I assume we can deploy spark in a similar way. Thank you.

assignUser commented 4 months ago

It looks like the tests themselves start hadoop/azurite for example see connectors/hive/storage_adapters/hive/tests If you show me how you setup spark for your poc tests (e.g. your bash script) I can help with getting that setup in an action.

Here is the service container reference, sounds useful https://docs.github.com/en/actions/using-containerized-services/about-service-containers