prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.06k stars 5.38k forks source link

[Design] Recoverable Grouped Execution #12124

Closed wenleix closed 2 years ago

wenleix commented 5 years ago

(A comment-friendly version of this design doc can be found at https://docs.google.com/document/d/1YhibgfzxtkjeJoYtty7R_AdBjTQdqf2nTtwQHk3kGLA/edit?usp=sharing)

Introduction

Grouped execution was introduced to Presto in https://github.com/prestodb/presto/pull/8951 to support huge join and aggregation raised in ETL pipelines.

When the input tables are already partitioned on the join key or aggregation key (e.g. bucketed table in Hive), Presto could process a subset (group) of the partitions of the data at a time. This reduces the amount of memory needed to hold the hash table. Implementation wise, for a stage with grouped execution enabled, it is further split into many “lifespan”s, where each lifespan corresponding to a table partition (e.g bucket in Hive table). Only a subset of lifespans are processed simultaneously during stage execution, configured by concurrent-lifespans-per-task.

Besides breaking the memory barrier, grouped execution also enables partial query failure recovery -- each lifespan can be retried independently when the output of the stage writes to a persistent storage. Note output from failed tasks needs to be cleaned up, as discussed later.

Preliminary

Consider the following query, where A and B are already bucketed on custkey:

SELECT ...
FROM A JOIN B 
    USING custkey

Without grouped execution, the workers will load all the data on build side (Table B):

f1

However, since A and B are already bucketed, Presto can schedule the query execution in a more intelligent way to reduce peak memory consumption: for each bucket i, joining the bucket i on table A and B can be done independently! In Presto engine, we call this computation unit a “lifespan”:

f2

Besides scaling Presto to more memory-intensive queries, grouped execution also opens opportunity for partial query failure recovery, as now each lifespan in the query are independent and can be retried independently. As illustrated in the following figure:

f3

Design

A prototype can be found in https://github.com/wenleix/presto/commits/tankbbb

1. Dynamic lifespan schedule

This is done in https://github.com/prestodb/presto/pull/11693. Before this change, lifespan are pre-allocated to tasks in a fixed way, which doesn’t work for restarted lifespan since it requires to be allocated to a different task.

Note dynamic life schedule only works when there is no remote source in the stage. In the future, we also want to add support for remote source with replicated distribution (i.e. for broadcast join)

2. Track Persistent Lifespan

In current code, SQLTaskExecution will report a lifespan as completed if all the drivers are done and there are no more inputs. However, it doesn’t check whether lifespan’s output has been delivered. In failure recovery scenario, we want to track such “persistent” lifespans which don’t need to be restarted after task failure.

There are two options to track whether a lifespan’s output is delivered:

Another problem is how to cleanup output from failed lifespans, which will be discussed in next section.

POC based on first option can be found at https://github.com/wenleix/presto/commit/a6e89a1ff6985bc4692c8fcc21753ae9e2028085.

3. Support Lifespan Granularity Commit

Failed lifespans may generate temporary output, which cannot be included in the final output. Thus the ConnectorPageSink has to support partial commit.

We can support partial commit in HiveConnector in the following way:

This protocol should work with STAGE_AND_MOVE_TO_TARGET_DIRECTORY write mode.

This commit protocol only requires the underlying filesystem to implement atomic rename. This is also the approach used by MapReduce/Spark.

Note in case there are more than one tasks try to do the rename (i.e. a task considered as failed by coordinator, but it can still talk to filesystem), we cannot decide which task finally win the rename race. Thus stats cannot be updated with recoverable grouped execution for now.

4. Allow Removing Remote Source from ExchangeClient

For a failed task, its receiver stage (i.e. TableFinishOperator) needs to cancel waiting for output from it.

Note that adding this support to n-to-n exchange is inherently hard, since sending the cancellation requests to every receiver task introduced too many coordinator to worker HTTP requests in a bursty manner. (see the discussion in https://github.com/prestodb/presto/pull/11065). However, it's OK for the purpose of supporting recovery, since only n-to-1 exchange needs to be supported.

POC: https://github.com/wenleix/presto/commit/79363c1cf689e87c4f791d4e72e17ce0ea7c78d6

5. Reschedule Splits for Failed Task

For restarted tasks, splits needs to be re-scheduled. A rewind() API will be added to SplitSource interface. This is trivial for FixedSplitSource since all splits are pre-loaded. It’s more sophisticated for HiveSplitSource since we start the query execution while we are still discovering splits.

For now we decided to keep all splits in memory for HiveSplitSource when running in recovery mode. Note even only with grouped execution, we are likely already buffering all the HiveInternalSplit since split discovery in bucketed mode do not block “offer”: https://github.com/prestodb/presto/blob/0bfe80b2540727341c0a33207b3f5dc58ca30aa2/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplitSource.java#L203-L211

We have two implementation options:

POC based on first option: https://github.com/wenleix/presto/commit/37ef4a4ec66ca1bae6212356aabc0e6ffeb4bdc8

6. Restart Failed Tasks

SqlStageExecution should coordinate task restart, such as asking StageScheduler to restart the task, remove source from TableFinishOperator stage, etc.

POC: https://github.com/wenleix/presto/commit/0af6291b5b36d8292dd9b348403117424d3e1228

Loose Ends

wenleix commented 5 years ago

cc @dain , @kokosing , @findepi

sopel39 commented 5 years ago

Facebook and S3 currently use DIRECT_TO_TARGET_NEW_DIRECTORY write mode, which doesn’t fit well into this model. We might want to introduce other write mode for this.

I was thinking that we could use some kind of "sub-transactions" for the failure cleanup. This would delegate the cleanup to the connector (e.g: we could use Hive 3 MVCC for S3). Then the coordinator call either:

TransactionManager#asyncCommit
or
TransactionManager#asyncAbort

If any fails, then the whole query fails. This would also remove the need for cleanup in ConnectorMetadata.finishInsert/finishCreateTable

stale[bot] commented 2 years ago

This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things.