flock-lab / flock

Flock: A Low-Cost Streaming Query Engine on FaaS Platforms
https://flock-lab.github.io/flock/
GNU Affero General Public License v3.0
285 stars 39 forks source link

feat: distributed shuffling on cloud function services #444

Closed gangliao closed 2 years ago

gangliao commented 2 years ago

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

By submitting this pull request

gangliao commented 2 years ago
 /`
/:y/` `
`shdhso.
 -yhddh+.
  .yhhhy+-
   .syyhs+/.
    `+shhs++:.
     `:syyyo++/.
       .+ssys+++/-`          `.----.`
        ./oyyyo+++/:.`     `-/+++/-..`
          -/osyso++++/:.` -/++/-`
           .-/osssoo++++/:++++`
           `.-/++osooo++++++++-
              `-:/+oooo++++++o/
                `-:/+o++++++oo-                                `````             `
                 `.-//++++++o/   `:++:::://   .:++:`        .:///////.       .://///+-   ./++:` .++/.
                 ``..:+++++o+`     os`   -+     ss        `/+-`//. `-+/`   `+s:`   `o:    `so  `:+-
                     :+++++/`      os`  --      ss        /o`  `+o`  `++   +s:      ``    `so .+:`
                   `:+++++:        os:::o/      ss        o/   /+++`  :s   ss.            `ss/so`
                 .:++++:.`         os`  --      ss     `  /o``+/``o/:`++   +s:      `     `so .oo.
             `.:/++++/.            os`          ss    :+   /+:-`  .-:+/`   `+s:`    o/    `so  `+s:
          .-----:/++-            `:++:-       .:++::::+/    .:++//++:.       ./++///+-   .:o+:`  :o/:
          `.-:::-/:`                                            ``
        `--.``-/:`
            .:-`

Flock: A Practical Serverless Streaming SQL Query Engine (https://github.com/flock-lab/flock)

Copyright (c) 2020-present, UMD Data System Group.

▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒

This program is free software: you can use, redistribute, and/or modify it under the terms of the GNU Affero
General Public License, version 3 or later ("AGPL"), as published by the Free Software Foundation.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

You should have received a copy of the GNU Affero General Public License along with this program. If not, see
<http://www.gnu.org/licenses/>.

▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒

================================================================
                    Running the benchmark                       
================================================================
[info] Running the NEXMark benchmark with the following options:

NexmarkBenchmarkOpt {
    query_number: 4,
    generators: 1,
    seconds: 1,
    events_per_second: 1000000,
    data_sink_type: "blackhole",
    async_type: false,
    memory_size: 1024,
    architecture: "x86_64",
    distributed: true,
    state_backend: "hashmap",
    target_partitions: 8,
}

[info] Streaming: ElementWise
[info] SQL query:

SELECT category,
       Avg(final)
FROM   (SELECT Max(price) AS final,
               category
        FROM   auction
               INNER JOIN bid
                       ON a_id = auction
        WHERE  b_date_time BETWEEN a_date_time AND expires
        GROUP  BY a_id,
                  category) AS Q
GROUP  BY category;

[info] === Query Stage 0 ===
[info] Current function type: Lambda
[info] Next function name: Lambda("q4-01")
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }], 8)
    RepartitionExec: partitioning=RoundRobinBatch(8)
      MemoryExec: partitions=0, partition_sizes=[]

CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "auction", index: 0 }], 8)
    RepartitionExec: partitioning=RoundRobinBatch(8)
      MemoryExec: partitions=0, partition_sizes=[]

[info] === Query Stage 1 ===
[info] Current function type: Lambda
[info] Next function name: Group(("q4-02", 16))
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "a_id", index: 0 }, Column { name: "category", index: 1 }], 8)
    HashAggregateExec: mode=Partial, gby=[a_id@0 as a_id, category@3 as category], aggr=[MAX(bid.price)]
      CoalesceBatchesExec: target_batch_size=4096
        FilterExec: b_date_time@6 >= a_date_time@1 AND b_date_time@6 <= expires@2
          CoalesceBatchesExec: target_batch_size=4096
            HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "a_id", index: 0 }, Column { name: "auction", index: 0 })]
              MemoryExec: partitions=0, partition_sizes=[]
              MemoryExec: partitions=0, partition_sizes=[]

[info] === Query Stage 2 ===
[info] Current function type: Group
[info] Next function name: Group(("q4-03", 16))
CoalesceBatchesExec: target_batch_size=4096
  RepartitionExec: partitioning=Hash([Column { name: "category", index: 0 }], 8)
    HashAggregateExec: mode=Partial, gby=[category@1 as category], aggr=[AVG(Q.final)]
      ProjectionExec: expr=[final@0 as final, category@1 as category]
        ProjectionExec: expr=[MAX(bid.price)@2 as final, category@1 as category]
          HashAggregateExec: mode=FinalPartitioned, gby=[a_id@0 as a_id, category@1 as category], aggr=[MAX(bid.price)]
            MemoryExec: partitions=0, partition_sizes=[]

[info] === Query Stage 3 ===
[info] Current function type: Group
[info] Next function name: Sink(Blackhole)
ProjectionExec: expr=[category@0 as category, AVG(Q.final)@1 as AVG(Q.final)]
  HashAggregateExec: mode=FinalPartitioned, gby=[category@0 as category], aggr=[AVG(Q.final)]
    MemoryExec: partitions=0, partition_sizes=[]

[info] Created lambda function: q4-00
[info] Created lambda function: q4-01
[info] Creating lambda function group: (q4-02, 16)
[info] Created function member: q4-02-13
[info] Created function member: q4-02-01
[info] Created function member: q4-02-10
[info] Created function member: q4-02-00
[info] Created function member: q4-02-06
[info] Created function member: q4-02-11
[info] Created function member: q4-02-15
[info] Created function member: q4-02-04
[info] Created function member: q4-02-07
[info] Created function member: q4-02-05
[info] Created function member: q4-02-02
[info] Created function member: q4-02-03
[info] Created function member: q4-02-14
[info] Created function member: q4-02-08
[info] Created function member: q4-02-12
[info] Created function member: q4-02-09
[info] Creating lambda function group: (q4-03, 16)
[info] Created function member: q4-03-11
[info] Created function member: q4-03-06
[info] Created function member: q4-03-03
[info] Created function member: q4-03-09
[info] Created function member: q4-03-04
[info] Created function member: q4-03-01
[info] Created function member: q4-03-02
[info] Created function member: q4-03-05
[info] Created function member: q4-03-12
[info] Created function member: q4-03-13
[info] Created function member: q4-03-14
[info] Created function member: q4-03-08
[info] Created function member: q4-03-10
[info] Created function member: q4-03-07
[info] Created function member: q4-03-15
[info] Created function member: q4-03-00
[info] [OK] Invoking NEXMark source function: q4-00 by generator 0