flock-lab / flock

Flock: A Low-Cost Streaming Query Engine on FaaS Platforms
https://flock-lab.github.io/flock/
GNU Affero General Public License v3.0
292 stars 39 forks source link

RFC: SQL on cloud function services #8

Closed gangliao closed 3 years ago

gangliao commented 4 years ago

Continuous query using SQL is one requirement of real-time stream processing [1].

This project has great potential to make another contribution to simplifying the serverless programming model via directly recasting stream queries into data flow models where each operator or node in the graph is a cloud function service managed by AWS Step Functions [3]. In this way, we can support graphs cyclic dataflows and iterations on streams.

Materialize does all of this by recasting SQL92 queries as dataflows. We can go further and directly convert SQL as FaaS dataflow. It's quite easy to parse SQL and support custom dialects [2].

References

  1. The 8 Requirements of Real-Time Stream Processing. Michael Stonebraker, Uğur Çetintemel and Stan Zdonik
  2. https://github.com/ballista-compute/sqlparser-rs
  3. https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html
gangliao commented 4 years ago

Step Functions is a serverless orchestration service (or state machine) that lets us combine AWS Lambda functions and other AWS services to build a dataflow stream processing application.

ServerlessCQ parses continuous queries, generates abstract syntax trees, and transforms optimized logical plans to state machines [1] on AWS, in which each state or task is a lambda function.

  1. Amazon States Language: https://states-language.net/spec.html#example
gangliao commented 3 years ago

Our Contribution #1

AWS State Language

For FaaS, users have to provide function implementations for their applications. Moreover, for complex applications that require multiple stages of functions, there are often many valid evaluation strategies and execution orders. AWS Step Functions (empowered by AWS State Language) is an emerging approach that allows users to orchestrate functions as they need. However, AWS State Language's workflow model has its own set of limitations. For example, to reap the benefits of FaaS --- pay-as-you-go and auto-scaling, users are forced to manually map each distinct query to a brand new dataflow execution model.

This mapping is unnatural to continuous queries. For each query, having users orchestrate relevant cloud functions through vendor lock-in language is equivalent to asking users to specify physical execution plans directly in database systems.

Solution: Cloud-native SQL for Cloud Function Services

ServerlessCQ jumped at the chance to build a transpiler at the client CLI, which is a special type of compiler so that customers and engineers can get out of the business of manual SQL translation (to Amazon State Language).

ServerlessCQ parses the input query using a formal grammar-based lexer and parser (ANTLR), then translates that to an intermediate representation. This allows us to directly translate SQL queries from the client-side to dataflow models (e.g. AWS Step Functions) on the cloud.

This implies new opportunities for optimizing adaptive query execution based on cloud provisioning. Unlike traditional systems, ServerlessCQ has the potential to be the first cloud-native streaming system that supports SQL on the serverless functions.

gangliao commented 3 years ago

SCOPE is a very interesting declarative language. It allows users to focus on the data transformations required to solve the problem at hand and hides the complexity of the underlying platform and implementation details. The SCOPE compiler and optimizer are responsible for generating an efficient execution plan (MapReduce) and the runtime for executing the plan with minimal overhead.

ServelessCQ shares a similar goal but in a new context -- FaaS. For example, the output of the compilation of a SCOPE script consists of three components:

  1. the algebra file that enumerates all super vertices and the data flow relationships among them.
    • Gang:: in ServerlessCQ, SQL queries are directly translated to AWS Step Functions (a data flow graph) for pipeline execution.
  2. the super vertex definition file, which contains the specification and entry points in the generated code for all super vertices.
    • Gang:: in ServerlessCQ, we use AWS State Language to describe the specification and entry points in a JSON file.
  3. the assembly itself, which contains the generated code. This package is sent to the cluster for execution.
    • Gang:: in ServerlessCQ, we compile individual operators to isolated Lambda functions and then upload to AWS.

Many optimizations from SCOPE can be applied to ServerelessCQ. Furthermore, ServerlessCQ has more optimization strategies on the cloud.

What is different from the previous system is that ServerelessCQ completely abandoned the job scheduler and instead let the cloud provider delegate the job scheduling.

Reference

  1. SCOPE: Easy and Efficient Parallel Processing of Massive Data Sets. PVLDB'08
  2. SCOPE: Parallel Databases Meet MapReduce. VLDBJ'12
  3. Incorporating partitioning and parallel plans into the SCOPE optimizer. ICDE'10
  4. Efficient exploitation of similar subexpressions for query processing. SIGMOD'07
gangliao commented 3 years ago

System Design:

Client:

  1. ServerlessCQ transforms continuous queries into logical plans through Arrow's data fusion[1], and then further into the physical plans.
      +----------------------------------------------------------------------+
      |                   Client    (CLI, Java, C++, Go, Rust...)            |
      |----------------------------------------------------------------------|
      |                                                                      |
      |                                                                      |
      |   +--------+     +-------------------------+      +---------------+  |
      |   | SQL    +---->| Logical Plan(Optimized) +----->| Physical Plan |  |
      |   +--------+     +-------------------------+      +---------------+  |
      |                                                                      |
      +----------------------------------------------------------------------+

Physical plans are typically described as a hierarchical structure where each node has an explicit way to execute a particular operation. Any machine, as long as it imports the datafusion library [1] and obtains the physical plan, it can directly perform the corresponding calculations. This means that each operator (max, min, sum, order, aggregate, join(maybe)) does not need to be manually written, but directly calls the operations that Arrow comes with. Arrow itself has a memory format for columnar data, and each operation is optimized by SIMD.

AWS:

After serializing the physical plan [2], ServerlessCQ sends and splits the serialized physical plan into each Lambda function through the payload of the Lambda function. After completing the initialization phase, ServerlessCQ triggers the workflow according to the time window specified by SQL. Each Lambda function executes operators refer to the partial physical plan it has.

Step 1: Initialization

  1. The first input: payload(serialized physical plan)

After the plan is partially deployed in the current lambda function, the remaining part of the plan will propagate downward.

       +-------------------------------------------------------------------------------+
       |                                     AWS                                       |
       |-------------------------------------------------------------------------------|
       |                                                                               |
       |     xxxxxxxxxx                      lambda                                    |
       |     x        xxx                   +--------+                                 |
       |     x Source   x                   |        |                                 |
       |     xxxxxxxxx+xx      lambda   +-->|        |------+                          |
       |        x     |      +-------+  |   +--------+      |                          |
       |              +----->|       |  |                   |                          |
       |                     |       |--|    lambda         v lambda                   |
       |                     |       |  |   +--------+   +-------+                     |
       |                     +-------+  |-->|        |   |       |+--------+           |
       |                                |   |        |-->|       |         |           |
       |                                |   +--------+   +-------+         |           |
       |                                |    lambda         ^              v           |
       |                                |   +--------+      |         xxxxxxxxxxxxx    |
       |                                |   |        |      |       xxx            x   |
       |                                +-->|        |------+         xxx  Sink    x   |
       |                                    +--------+                  xxxxx    xx    |
       |                                                                     xxxxx     |
       |                                                                               |
       |                                                                               |
       +-------------------------------------------------------------------------------+

After the initialization phase, all lambda functions include a part of the execution plan.

       +-------------------------------------------------------------------------------+
       |                                     AWS                                       |
       |-------------------------------------------------------------------------------|
       |                                                                               |
       |     xxxxxxxxxx                      lambda                                    |
       |     x        xxx                   +--------+                                 |
       |     x Source   x                   |partial |                                 |
       |     xxxxxxxxx+xx      lambda   +-->|join    |------+                          |
       |        x     |      +-------+  |   +--------+      |                          |
       |              +----->|scan   |  |                   |                          |
       |                     |project|--|    lambda         v lambda                   |
       |                     |filter |  |   +--------+   +-------+                     |
       |                     +-------+  |-->|partial |   |sort   |+--------+           |
       |                                |   |join    |-->|limit  |         |           |
       |                                |   +--------+   +-------+         |           |
       |                                |    lambda         ^              v           |
       |                                |   +--------+      |         xxxxxxxxxxxxx    |
       |                                |   |partial |      |       xxx            x   |
       |                                +-->|join    |------+         xxx  Sink    x   |
       |                                    +--------+                  xxxxx    xx    |
       |                                                                     xxxxx     |
       |                                                                               |
       |                                                                               |
       +-------------------------------------------------------------------------------+

The introduction of the initialization phase has two benefits: (1) deployment of the execution plan (2) Keep lambda functions warm and avoid cold starts.

Step 1: Dataflow

The SQL time window will generate a cron expression in the CloudWatch[3]. It will trigger the workflow, and then all the data from the source is read, and the distributed dataflow computation is started.

References

  1. Arrow Datafusion: https://github.com/apache/arrow/tree/master/rust/datafusion
  2. Serde JSON: https://docs.serde.rs/serde_json/
  3. Schedule expressions using rate or cron: https://docs.aws.amazon.com/lambda/latest/dg/services-cloudwatchevents-expressions.html