facebookincubator / velox

A composable and fully extensible C++ execution engine library for data management systems.
https://velox-lib.io/
Apache License 2.0
3.49k stars 1.14k forks source link

Add multiple table writer driver support #5546

Open xiaoxmeng opened 1 year ago

xiaoxmeng commented 1 year ago

Description

Currently Velox doesn't support multiple drivers for table write which can't parallelize the table write processing on a worker and the file write is very cpu intensive operation. It also lead to other limitation such as the total number of partitions that can be handled by a worker (by default 100 (bucketed) partition per a worker). To support that, we need to add the following support:

  1. Velox need to respect two session properties: "task_writer_count": the number of table writer drivers for non-bucketed table "task_partitioned_writer_count": the number of table writer drivers for bucketed table These two properties determine the number of table write drivers instead of "max_drivers_per_task" in Velox or "task_concurrency" in Presto Java. Also note in Presto java, "task_partitioned_writer_count" is optional and if it is not set, then it falls back to use "task_writer_count". To support those two session properties, we need to maxDrivers() in LocalPlanner in Velox to set the number of drivers for TableWriteNode accordingly.
  2. Extend to TableWriteNode to store the materialized partitioningScheme or just a flag to indicate whether it is bucketed table or not. Presto coordinator will only set partitioningScheme in case of bucketed table. Alternatively, to tell this from ConnectorInsertTableHandle by adding a API say isBucketed or not but that's not in align with Presto java side processing and to specific to current implementation.
  3. Bucketed table write needs the remote data shuffling between table write stage and its preceding stage or source stage. This ensures the data with the same bucket id goes to the same presto worker or Velox runtime. The partitioned output operator needs to respect the hive bucket function for data shuffling. This part is already supported. Note for non-bucketed table write, we just need round robin data shuffling or repartitioning.
  4. Bucketed table write needs local data shuffling between remote exchange and table writer operators. This needs a local partition in between and use hive bucket function as the partition function. This ensures the data with the same bucket id goes to the same table writer driver or the operators. Note for non-bucketed table write, we just need local round robin data shuffling or repartitioning. We need to add hive bucket partition function support in Prestissimo when create LocalPartitionNode (in toVeloxQueryPlan for sources of ExchangeNode). Here we also need to extend HivePartitionFunctionSpec to handle the case the bucketToPartition map is unknown until we create a hive partition function instance as when we convert the presto query plan to velox plan, we don't know the actual number of table write drivers which is the number of partitions. Correspondingly, we need to build a map from bucket ids to partition when we create a hive partition function. The partition function creation method will pass the number of partitions which is the actual number of table write drivers.
  5. HiveDataSink needs to switch to use HivePartitionFunction instead of HashPartitionFunction to calculate the bucket id as the two has slightly difference in implementation and the calculated bucket id will be different. We need to make this consistent.
  6. Add to TableWriteMerger node support. Presto coordinator will add a table writer merge node whenever a local exchange is added in the local table writer plan to support multiple table writers: (1) for bucketed table writer or (2) the number of remote exchange drivers is different than the number of table writer drivers. (2) is based on the difference of "task_writer_count"/"task_partitioned_writer_count" and "task_concurrency". So we might need to make "max_drivers_per_task" to be same as "task_concurrency" to make this consistent across velox runtime and Java eval when Presto coordinator makes plan (see visitTableWriter in AddLocalExchanges.java). Note in case (2), we don't need special handling as the local exchange is using round robin repartitioning function but need to verify in e2e.
xiaoxmeng commented 1 year ago

cc @mbasmanova @kewang1024