XpressAI / SparkCyclone

Plugin to accelerate Spark SQL with the NEC Vector Engine.
https://sparkcyclone.io
Apache License 2.0
16 stars 4 forks source link

Discussion 2021/06/09 #79

Open wgip opened 3 years ago

wgip commented 3 years ago

Discussion was based around this:

*(4) Sort [joined_timestamp#20L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(joined_timestamp#20L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#106]
   +- *(3) HashAggregate(keys=[userId#42, joined_timestamp#20L], functions=[sum(cast(totalPrice#44 as double))], output=[sum(CAST(totalPrice AS DOUBLE))#52, userId#42, joined_timestamp#20L])
      +- Exchange hashpartitioning(userId#42, joined_timestamp#20L, 200), ENSURE_REQUIREMENTS, [id=#102]
         +- *(2) HashAggregate(keys=[userId#42, joined_timestamp#20L], functions=[partial_sum(cast(totalPrice#44 as double))], output=[userId#42, joined_timestamp#20L, sum#58])
            +- *(2) Project [JOINED_TIMESTAMP#20L, userId#42, totalPrice#44]
               +- *(2) BroadcastHashJoin [id#17], [cast(userId#42 as int)], Inner, BuildRight, false
                  :- *(2) Scan JDBCRelation(Users) [numPartitions=1] [ID#17,JOINED_TIMESTAMP#20L] PushedFilters: [*IsNotNull(ID)], ReadSchema: struct<ID:int,JOINED_TIMESTAMP:bigint>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as int) as bigint)),false), [id=#96]
                     +- *(1) Filter isnotnull(userId#42)
                        +- FileScan csv [userId#42,totalPrice#44] Batched: false, DataFilters: [isnotnull(userId#42)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/William/IdeaProjects/aurora4spark/aurora4spark-parent/aurora4spa..., PartitionFilters: [], PushedFilters: [IsNotNull(userId)], ReadSchema: struct<userId:string,totalPrice:string>

Outcomes:

Wosin commented 3 years ago

I've spent quite some time to try to link the external libraries with ncc. While I was able to build that with ncc (so should work with nc++ too) by using Arrow-GLib. However, this only works when we specify the paths to the libraries manually. However, it seems that for running Aurora4j and PyVeo depend on nld to dynamically link the libraries.

The problem is that nld doesn't seem to be compatible with the libraries as it logs the following information:

nld: skipping incompatible /usr/lib64/libarrow-glib.so when searching for -larrow-glib
nld: skipping incompatible /usr/lib64/libarrow-glib.a when searching for -larrow-glib

Not really sure yet why this occurs.

wgip commented 3 years ago

@Wosin libarrow-glib.so would need to be compiled for aurora architecture as well

you probably won't be able to link x86 libs into an aurora project

Wosin commented 3 years ago

Yeah, I was thinking if there is a way to link them statically though.

Wosin commented 3 years ago

It seems that WholeStageCodegen is represented as a parent plan that keeps the original plan as child. So, basically if we have something like HashAggregate and that single operation gets transformed by whole stage codegen, then a new spark plan is created WholeStageCodeGenExec which will have the HashAggregate as a `child.

What is more, it seems from my experiments that the wholestage codegen is actually executed after our plugin not before, so I don't think we need to actually disable that at all.

wgip commented 3 years ago

More notes:

object VeoGenericPlanExtractor {
  def matchPlan(sparkPlan: SparkPlan): Option[GenericSparkPlanDescription] = {
    PartialFunction.condOpt(sparkPlan) {
      case first @ HashAggregateExec(
            requiredChildDistributionExpressions,
            groupingExpressions,
            exprs,
            aggregateAttributes,
            initialInputBufferOffset,
            resultExpressions,
            org.apache.spark.sql.execution.exchange
              .ShuffleExchangeExec(
                outputPartitioning,
                f @ org.apache.spark.sql.execution.aggregate
                  .HashAggregateExec(
                    _requiredChildDistributionExpressions,
                    _groupingExpressions,
                    _aggregateExpressions,
                    _aggregateAttributes,
                    _initialInputBufferOffset,
                    _resultExpressions,
                    fourth @ sparkPlan
                  ),
                shuffleOrigin
              )
          ) => {
        val columnIndices = fourth.output.map(_.name).zipWithIndex.toMap
        println(s"CID => ${columnIndices}")
        println(s"First =$first")

        val functions =
          _aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
        val resIds =
          _aggregateExpressions.map(_.resultId)
        val initExpr = functions.map(f => f.initialValues)
        val aggregateBufferAttributes: Seq[AttributeReference] =
          _aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)

        val aggResults =
          bindReferences(functions.map(_.evaluateExpression), aggregateBufferAttributes)
        val resultVars = bindReferences(resultExpressions, aggregateAttributes)

        List(
          "modes" -> _aggregateExpressions.map(_.mode).distinct,
          "rcde" -> _requiredChildDistributionExpressions.toSeq.flatten,
          "ge" -> _groupingExpressions,
          "ae" -> _aggregateExpressions,
          "aa" -> _aggregateAttributes,
          "allAtts" -> f.allAttributes.attrs,
          "re" -> _resultExpressions,
          "fo" -> fourth.output,
          "bufVars" -> initExpr,
          "resIds" -> resIds,
          "preAggResults" -> functions.map(_.evaluateExpression),
          "aggResults" -> aggResults,
          "resultVars" -> resultVars,
          "aggregateBufferAttributes" -> aggregateBufferAttributes,
          "out" -> f.output
        ).foreach { case (k, vs) =>
          println(s"$k: ==>")
          vs.foreach(println)
          println(" ")
        }

        val columnMappings = exprs
          .map(expression => (expression, extractAttributes(expression.references)))
          .zipWithIndex
          .map { case ((operation, attributes), id) =>
            ColumnAggregationExpression(
              attributes.map(attr => Column(columnIndices(attr.value), attr.value)),
              operation,
              id
            )
          }
First =HashAggregate(keys=[], functions=[sum((_1#37 + _2#38)), avg((_2#38 - _1#37)), sum(_3#39)], output=[sum((_1 + _2))#47, avg((_2 - _1))#48, sum(_3)#49])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#33]
   +- HashAggregate(keys=[], functions=[partial_sum((_1#37 + _2#38)), partial_avg((_2#38 - _1#37)), partial_sum(_3#39)], output=[sum#64, sum#65, count#66L, sum#67])
      +- LocalTableScan [_1#37, _2#38, _3#39]

modes: ==>
Partial

rcde: ==>

ge: ==>

ae: ==>
partial_sum((_1#37 + _2#38))
partial_avg((_2#38 - _1#37))
partial_sum(_3#39)

aa: ==>
sum#60
sum#61
count#62L
sum#63

allAtts: ==>
_1#37
_2#38
_3#39
sum#60
sum#61
count#62L
sum#63
sum#60
sum#61
count#62L
sum#63
sum#64
sum#65
count#66L
sum#67

re: ==>
sum#64
sum#65
count#66L
sum#67

fo: ==>
_1#37
_2#38
_3#39

bufVars: ==>
List(null)
List(0.0, 0)
List(null)

resIds: ==>
ExprId(44,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(45,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(46,d84a5437-c39a-4a61-80ea-245df8effc7c)

preAggResults: ==>
sum#60
(sum#61 / cast(count#62L as double))
sum#63

aggResults: ==>
input[0, double, true]
(input[1, double, true] / cast(input[2, bigint, true] as double))
input[3, double, true]

resultVars: ==>
input[0, double, true] AS sum((_1 + _2))#47
input[1, double, true] AS avg((_2 - _1))#48
input[2, double, true] AS sum(_3)#49

aggregateBufferAttributes: ==>
sum#60
sum#61
count#62L
sum#63

out: ==>
sum#64
sum#65
count#66L
sum#67

CID => Map(_1 -> 0, _2 -> 1, _3 -> 2)
First =HashAggregate(keys=[], functions=[sum((_1#37 + _2#38)), avg((_2#38 - _1#37)), sum(_3#39)], output=[sum((_1 + _2))#47, avg((_2 - _1))#48, sum(_3)#49])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#38]
   +- HashAggregate(keys=[], functions=[partial_sum((_1#37 + _2#38)), partial_avg((_2#38 - _1#37)), partial_sum(_3#39)], output=[sum#64, sum#65, count#66L, sum#67])
      +- LocalTableScan [_1#37, _2#38, _3#39]

modes: ==>
Partial

rcde: ==>

ge: ==>

ae: ==>
partial_sum((_1#37 + _2#38))
partial_avg((_2#38 - _1#37))
partial_sum(_3#39)

aa: ==>
sum#60
sum#61
count#62L
sum#63

allAtts: ==>
_1#37
_2#38
_3#39
sum#60
sum#61
count#62L
sum#63
sum#60
sum#61
count#62L
sum#63
sum#64
sum#65
count#66L
sum#67

re: ==>
sum#64
sum#65
count#66L
sum#67

fo: ==>
_1#37
_2#38
_3#39

bufVars: ==>
List(null)
List(0.0, 0)
List(null)

resIds: ==>
ExprId(44,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(45,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(46,d84a5437-c39a-4a61-80ea-245df8effc7c)

preAggResults: ==>
sum#60
(sum#61 / cast(count#62L as double))
sum#63

aggResults: ==>
input[0, double, true]
(input[1, double, true] / cast(input[2, bigint, true] as double))
input[3, double, true]

resultVars: ==>
input[0, double, true] AS sum((_1 + _2))#47
input[1, double, true] AS avg((_2 - _1))#48
input[2, double, true] AS sum(_3)#49

aggregateBufferAttributes: ==>
sum#60
sum#61
count#62L
sum#63

out: ==>
sum#64
sum#65
count#66L
sum#67