flock-lab / flock

Flock: A Low-Cost Streaming Query Engine on FaaS Platforms
https://flock-lab.github.io/flock/
GNU Affero General Public License v3.0
285 stars 39 forks source link

Distributed mode: nexmark q4 and ysb #441

Closed gangliao closed 2 years ago

gangliao commented 2 years ago

NEXMark Q4:

SELECT category,
       Avg(final)
FROM   (SELECT Max(price) AS final,
               category
        FROM   auction
               INNER JOIN bid
                       ON a_id = auction
        WHERE  b_date_time BETWEEN a_date_time AND expires
        GROUP  BY a_id,
                  category) AS Q
GROUP  BY category;

=== Physical Plan ===
ProjectionExec: expr=[category@0 as category, AVG(Q.final)@1 as AVG(Q.final)]
  HashAggregateExec: mode=FinalPartitioned, gby=[category@0 as category], aggr=[AVG(Q.final)]
    CoalesceBatchesExec: target_batch_size=4096
      RepartitionExec: partitioning=Hash([Column { name: "category", index: 0 }], 16)
        HashAggregateExec: mode=Partial, gby=[category@1 as category], aggr=[AVG(Q.final)]
          ProjectionExec: expr=[final@0 as final, category@1 as category]
            ProjectionExec: expr=[MAX(bid.price)@2 as final, category@1 as category]
              HashAggregateExec: mode=FinalPartitioned, gby=[a_id@0 as a_id, category@1 as category], aggr=[MAX(bid.price)]
                CoalesceBatchesExec: target_batch_size=4096
                  RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }, Column { name: "category", index: 1 }], 16)
                    HashAggregateExec: mode=Partial, gby=[a_id@0 as a_id, category@3 as category], aggr=[MAX(bid.price)]
                      CoalesceBatchesExec: target_batch_size=4096
                        FilterExec: b_date_time@6 >= a_date_time@1 AND b_date_time@6 <= expires@2
                          CoalesceBatchesExec: target_batch_size=4096
                            HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a_id", index: 0 }, Column { name: "auction", index: 0 })]
                              CoalesceBatchesExec: target_batch_size=4096
                                RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }], 16)
                                  RepartitionExec: partitioning=RoundRobinBatch(16)
                                    MemoryExec: partitions=1, partition_sizes=[1]
                              CoalesceBatchesExec: target_batch_size=4096
                                RepartitionExec: partitioning=Hash([Column { name: "auction", index: 0 }], 16)
                                  RepartitionExec: partitioning=RoundRobinBatch(16)
                                    MemoryExec: partitions=1, partition_sizes=[1]

=== Stage 0 ===
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }], 16)
    RepartitionExec: partitioning=RoundRobinBatch(16)
      MemoryExec: partitions=0, partition_sizes=[]

CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "auction", index: 0 }], 16)
    RepartitionExec: partitioning=RoundRobinBatch(16)
      MemoryExec: partitions=0, partition_sizes=[]

=== Stage 1 ===
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }, Column { name: "category", index: 1 }], 16)
    HashAggregateExec: mode=Partial, gby=[a_id@0 as a_id, category@3 as category], aggr=[MAX(bid.price)]
      CoalesceBatchesExec: target_batch_size=4096
        FilterExec: b_date_time@6 >= a_date_time@1 AND b_date_time@6 <= expires@2
          CoalesceBatchesExec: target_batch_size=4096
            HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a_id", index: 0 }, Column { name: "auction", index: 0 })]
              MemoryExec: partitions=0, partition_sizes=[]
              MemoryExec: partitions=0, partition_sizes=[]

=== Stage 2 ===
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "category", index: 0 }], 16)
    HashAggregateExec: mode=Partial, gby=[category@1 as category], aggr=[AVG(Q.final)]
      ProjectionExec: expr=[final@0 as final, category@1 as category]
        ProjectionExec: expr=[MAX(bid.price)@2 as final, category@1 as category]
          HashAggregateExec: mode=FinalPartitioned, gby=[a_id@0 as a_id, category@1 as category], aggr=[MAX(bid.price)]
            MemoryExec: partitions=0, partition_sizes=[]

=== Stage 3 ===
ProjectionExec: expr=[category@0 as category, AVG(Q.final)@1 as AVG(Q.final)]
  HashAggregateExec: mode=FinalPartitioned, gby=[category@0 as category], aggr=[AVG(Q.final)]
    MemoryExec: partitions=0, partition_sizes=[]
gangliao commented 2 years ago

YSB:

=== Stage 0 ===
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "ad_id", index: 0 }], 16)
    CoalesceBatchesExec: target_batch_size=4096
      FilterExec: event_type@1 = view
        RepartitionExec: partitioning=RoundRobinBatch(16)
          MemoryExec: partitions=0, partition_sizes=[]

CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "c_ad_id", index: 0 }], 16)
    RepartitionExec: partitioning=RoundRobinBatch(16)
      MemoryExec: partitions=0, partition_sizes=[]

=== Stage 1 ===
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "campaign_id", index: 0 }], 16)
    HashAggregateExec: mode=Partial, gby=[campaign_id@3 as campaign_id], aggr=[COUNT(UInt8(1))]
      CoalesceBatchesExec: target_batch_size=4096
        HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "ad_id", index: 0 }, Column { name: "c_ad_id", index: 0 })]
          MemoryExec: partitions=0, partition_sizes=[]
          MemoryExec: partitions=0, partition_sizes=[]

=== Stage 2 ===
ProjectionExec: expr=[campaign_id@0 as campaign_id, COUNT(UInt8(1))@1 as COUNT(UInt8(1))]
  HashAggregateExec: mode=FinalPartitioned, gby=[campaign_id@0 as campaign_id], aggr=[COUNT(UInt8(1))]
    MemoryExec: partitions=0, partition_sizes=[]
gangliao commented 2 years ago

check the next function type: lambda or group, then choose execute_partitioned or execute.

gangliao commented 2 years ago

Q4: https://github.com/flock-lab/flock/pull/444