NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
806 stars 234 forks source link

[FEA] Support for a custom DataSource V2 which supplies Arrow data #1072

Closed Dooyoung-Hwang closed 3 years ago

Dooyoung-Hwang commented 4 years ago

Is your feature request related to a problem? Please describe. When I executed an aggregation query with our custom data source, I found the physical plan of the query was like this.

spark.sql("SELECT bucket, count(*) FROM test_table GROUP BY bucket").explain(true)

== Physical Plan ==
*(2) GpuColumnarToRow false
+- GpuHashAggregate(keys=[bucket#423], functions=[gpucount(1)], output=[bucket#423, count(1)#570L])
   +- GpuCoalesceBatches TargetSize(2147483647)
      +- GpuColumnarExchange gpuhashpartitioning(bucket#423, 10), true, [id=#1210]
         +- GpuHashAggregate(keys=[bucket#423], functions=[partial_gpucount(1)], output=[bucket#423, count#574L])
            +- GpuRowToColumnar TargetSize(2147483647)
               +- *(1) Scan R2Relation(com.skt.spark.r2.RedisConfig@232fa9c6,2147483647) [bucket#423] PushedFilters: [], ReadSchema: struct<bucket:string>

This shows that the InternalRows are built firstly, and they are transformed into ColumnarBatches by GpuRowToColumnar plan. If the custom DataSource can provide RDD[ColumnBatch] to spark-rapids directly, it would be more efficient because the conversion overhead is removed.

Describe the solution you'd like

  1. In spark-rapids, add a trait of scala or an interface of java that requests RDD of ColumnarBatch.
  2. If the class in a custom V1 DataSource, which extends BaseRelation, also implements this interface, the physical plan which scans a custom v1 source can also be overridden by spark-rapids.

The changed physical plan can be illustrated like this.

== Physical Plan ==
*(1) GpuColumnarToRow false
+- GpuHashAggregate(keys=[bucket#423], functions=[gpucount(1)], output=[bucket#423, count(1)#570L])
   +- GpuCoalesceBatches TargetSize(2147483647)
      +- GpuColumnarExchange gpuhashpartitioning(bucket#423, 10), true, [id=#1210]
         +- GpuHashAggregate(keys=[bucket#423], functions=[partial_gpucount(1)], output=[bucket#423, count#574L])
            +- GpuV1SourceScan Batched: true, DataFilters: [], Format: r2, PartitionFilters: [], PushedFilters: [], ReadSchema: ReadSchema: struct<bucket:string>
jlowe commented 4 years ago

If the custom DataSource can provide RDD[ColumnBatch] to spark-rapids directly, it would be more efficient because the conversion overhead is removed.

Does this RDD[ColumnarBatch] contain GPU data or CPU data? If the latter there still would be a conversion from host columnar data to device columnar data. That type of conversion is already supported by the plugin, but it's important to note that a (cheaper) conversion would still occur. The plan would have a HostColumnarToGpu node instead of a GpuRowToColumnar node.

tgravescs commented 3 years ago

After discussions data source v1 doesn't support columnar so switch to use data source v2. With datasource v2, custom datasources just work and we insert a HostColumnarToGpu transition to get the data onto the GPU.

In this case I believe the data will already be in an Arrow format ArrowColumnVector we can investigate making the HostColumnarToGpu smarter about getting the data onto the GPU

tgravescs commented 3 years ago

note that looking at a couple of sample queries it uses Round of a decimal, which support for it in progress and it also uses average of a decimal which we don't support yet.

tgravescs commented 3 years ago

note for sample queries and data we can look at the taxi ride dataset and queries:

https://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/ explanation - https://tech.marksblogg.com/billion-nyc-taxi-rides-redshift.html 4 queries can so be found here: https://tech.marksblogg.com/omnisci-macos-macbookpro-mbp.html

This is the result of other solutions. https://tech.marksblogg.com/benchmarks.html

sameerz commented 3 years ago

Rounding support is being worked on in https://github.com/NVIDIA/spark-rapids/pull/1244 .
Average should work once we support casting, which is being tracked in this issue: https://github.com/NVIDIA/spark-rapids/issues/1330 .

tgravescs commented 3 years ago

Note we may also need percentile_approx here.

tgravescs commented 3 years ago

cudf jira for percentile_approx -> https://github.com/rapidsai/cudf/issues/7170

tgravescs commented 3 years ago

the main functionality to support faster copy when using datasourcev2 supplying arrow data is commited under https://github.com/NVIDIA/spark-rapids/pull/1622. It supports primitive types and Strings. It does not support Decimal or nested types yet.

tgravescs commented 3 years ago

note filed separate issue for write side https://github.com/NVIDIA/spark-rapids/issues/1648.
I'm going to close this as the initial version is committed