prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
15.89k stars 5.32k forks source link

Query Execution Optimization for Broadcast Join by Replicated-Reads Strategy #17619

Open fgwang7w opened 2 years ago

fgwang7w commented 2 years ago

This Github issue describes the design of the PrestoDB Query Execution Optimization for Broadcast Join by Replicated-Reads Strategy. The original design document can be found here. For reader's convenience, the content is summarized in this issue below.

1. Background

One of the most common best practices to build a parallel data warehousing is to design an optimal multidimensional schema e.g. star schema, or snowflake schema, in which a central table is known as the fact table and the other tables are known as dimension tables. Query comes with WHERE conditions on columns of the dimensions tables, grouping on columns of the dimension tables and aggregations based on columns of the fact tables.

In the MPP distributed system, there are multiple forms of distributed join methods, particularly we focus on join distribution methods supported in Presto: Collocated join, directed join, repartitioned join, broadcast join.

In this issue we focus on broadcast join. For broadcast join, all the records of T2 table happen on a single worker which is a single network data transfer. Then the data gets redistributed to all the other available N-1 worker nodes which hold the other table before the join is performed.Broadcast join can only happen on the builder side when the table size is smaller than the join_max_broadcast_table_size. Otherwise, the optimizer will choose to join by repartitioning both sides of join tables.

image

Note that this is a total of N+1 data transfers. Moreover, only the results of the table scan are potentially cached. Furthermore, the worker that is selected to do the caching is not necessarily chosen with any cache affinity.

In typical cases, dimension tables store supporting information to the fact table in a star schema database structure, or a snowflake schema where multiple dimension tables are involved in a single query. Running in low latency for many OLAP queries is very crucial for business success

In today’s implementation, presto scheduler instantiates a single stage for a worker to run data extraction, and then redistribute the entire dataset to all workers on a separate stage which builds a hash table in a set of tasks. This was not an ideal case for resource management. Scheduling tasks for a fact table joining multiple dimension tables in a snowflake schema using this optimization may result in better resource utilization because each plan fragment or stage can now manage the same workers to run tasks in parallel within a single stage to reduce data shuffling.

2. Proposal

In today’s Presto, collocated join is allowed only as a grouped execution when joining two bucketed tables for which the bucket properties are compatible. We propose to add a new collocated join strategy for broadcast join.

2.1 Optimization scenarios:

This feature introduces a new query execution strategy by enabling a collocated broadcast join where source data from small tables is pulled from remote cloud storage directly by all workers instead of having one worker being read table once and then send the entire data over the network to all other workers. This operation of allowing all workers directly pulling data from source storage is called replicated reads. Allowing replicated reads from dimension tables distributed across workers for parallel execution may speed up the query runtime for faster response time. Furthermore, given that all worker nodes N are now participating in scanning the data from remote source, the table scan is cacheable per all worker nodes regardless of node selection affinity for caching splits.

image

This optimization is feasible for data lake analytics on the cloud native environment, where the raw data is stored on a high bandwidth storage media supporting many parallel connections and capable of handling high concurrency of connection requests for remote data extraction, e.g AWS S3, Azure Blob Storage, Google Cloud BigStorage. E.g., Amazon S3 is a good use for this kind of optimization because it uses a scalable storage infrastructure with a top limit of 5500 GET/HEAD requests per second in an S3 bucket. S3 request workload is designed to scale up well under control before it gets maxed out.

This optimization will NOT fit for a use case where the data storage media has a limitation on handling multiple connections. That is, any remote data source that could only accept requests from multithreaded clients from a managed connection pool to instantiate multiple connections via JDBC/ODBC may not fit because there is a risk of overloading the server when scaling up the connection requests. E.g., MySQL has a default maximum permitted number of simultaneous client connection set to 151 which means a higher number of connections to fetch data from MySQL database may either results in not closing MySQL connection handlers properly or “too many connections” error, or server not responding due to sudden load spikes. This optimization is not ideal for remote storage that can only operate under a limited number of parallel connection threading given that most data pull tasks may be queued up that could increase the queue wait time as well as overall latency.

2.2 Impact to the workload:

This optimization reduces the data broadcast, relatively reducing network cost of data shuffle by trade-in with a higher number of connection concurrency to scan data by scheduler workers in parallel. The Replicated Join strategy now includes both "broadcast join" and "replicated read join" where broadcast join requires the table being replicated by a shuffle, vs. replicated join requires the table being replicated by reading it multiple times. Furthermore, it optimizes the scheduling policy to allow multiple dim data sources in a single stage for lower query latency.

In a later session, the benchmark test using Amazon S3 as a remote data source in cloud lake validates that to be able to do parallel reads to open hundreds of connections on the data source is more efficient than using a single connection and broadcasting the data across the cluster. It improves query performance up to 12% - 57% using extensive caching mechanisms.

3. Externals

3.1 Tuning knobs:

This optimization introduces following properties:

  • This property is a system level knob that allows the optimizer to properly structure the physical plan for qualified replicated reads tables to use replicated reads.
  • Default is FALSE to disable replicated reads optimization. As mentioned above, this optimization profiles for all broadcast joins. This is a non-hidden user facing property.

3.2 Plan comparison:

3.3 Other recommendations

4. Design

4.1 Design Overview

The essence of this feature is to reduce data shuffle cost to improve overall query runtime. In today’s presto, join by data replication today broadcasts the builder side of the join for which scheduler assigns splits to tasks on a single worker and then tasks in intermediate stages redistributes data from upstream tasks to other workers.

As shown below case 1, the scheduler sends splits of unpartitioned data by a single driver of worker 5 which redistributes the T2 data to other workers. So to optimize the network cost using replicated-reads join method, scheduler for Case 1 now sends tasks of splits to table T2 by multiple drivers in parallel by all 5 workers to make a collocated join. Same for Case 2 as T2 builds hash tables without requiring an extra stage of data shuffle which creates data redundancy and unnecessary network cost.

image

4.2 Planning Optimization / ConnectorMetadata:

  • hive.overwrite-high-bandwidth-storage-for-replicated-reads is enabled.
  • Table is stored in S3 FS or Caching FS that extends S3 FS

4.3 Optimizer

image

image

4.4 Scheduler (FixedSourcePartitionedScheduler)

4.4.1 Scheduling flow

image

4.4.2 Schedule splits

Schedule splits for RRT table

5. Performance

In this session, we present performance results that demonstrate the impact of this optimization.

5.1 Environment

r5.8xlarge configuration: (32vCPU | 256Gb RAM | 10Gbps | 6800Mbps EBS) Presto coordinator: r5.8xlarge x 1 instance Presto worker: r5.8xlarge x 16 instances Hive Metastore: m5.xlarge IO Cache(if present): gp2 SSD (384GB | IOPS 1152 Mbps) powered by RaptorX Workload: TPCDS sf10000 no partition on S3

5.2 Microbenchmark

5.2.1 Overall Performance

  • In Cold runs, there is no significant difference for both runs with cache and without cache in cold runs, mostly performance is achieved up to par.
  • In Warm runs, the performance gain is between 10% to 31% for no cache.
  • In Warm runs, the performance gain is between 48% to 57% for cache enabled on top of RaptorX
image image

5.3 Sample 10TB TPC-DS runs

5.3.1 Overall Performance

image

6. Operation Considerations

presto-catalog:

hive.split-loader-concurrency=64 <-- this may need to be changed dynamically

presto-coordinator-etc:

task.concurrency=32 query.min-schedule-split-batch-size=4000 node-scheduler.max-splits-per-node=4000 node-scheduler.max-pending-splits-per-task=4000

7. Future Work

8. Implementation

The working in progress PR will be listed here. Note that it's not ready for review yet

fgwang7w commented 2 years ago

Hello @pettyjamesm @tdcmeehan @rschlussel @kaikalur. There is a new query optimization feature developed at ahana that we plan to contribute to the community. This issue is opened to provide some context of the project, including design and implementation strategy. I plan to have a design review on upcoming TSC (04/12/2022). Please help to review the doc if possible. Thank you! cc @simmend @yingsu00