prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.02k stars 5.36k forks source link

Parallel read in jdbc-based connectors #10832

Closed kokosing closed 5 years ago

kokosing commented 6 years ago

Currently read jdbc-based tables are using single connection which could be slow. However other data engines are able to do a parallel table read. See the below.

In Sqoop - https://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html

7.2.4. Controlling Parallelism When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id. Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

In Spark - http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.

numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.

fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.

batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Related: - https://www.percona.com/blog/2016/08/17/apache-spark-makes-slow-mysql-queries-10x-faster/

The idea is simple: Spark can read MySQL data via JDBC and can also execute SQL queries, so we can connect it directly to MySQL and run the queries. Why is this faster? For long running (i.e., reporting or BI) queries, it can be much faster as Spark is a massively parallel system. MySQL can only use one CPU core per query, whereas Spark can use all cores on all cluster nodes. In my examples below, MySQL queries are executed inside Spark and run 5-10 times faster (on top of the same MySQL data).

In addition, Spark can add “cluster” level parallelism. In the case of MySQL replication or Percona XtraDB Cluster, Spark can split the query into a set of smaller queries (in the case of a partitioned table it will run one query per each partition for example) and run those in parallel across multiple slave servers of multiple Percona XtraDB Cluster nodes. Finally, it will use map/reduce the type of processing to aggregate the results.

I like the way that Spark is using. Following this approach we could have configuration (eg. mysql.properties) that would tell what column should be used for partitioning, number of partitions, low and max column partitioning column values. Then read from that table could be easily parallelized.

electrum commented 6 years ago

The Spark way looks good. You can see the code here it uses to build the queries: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Notes:

  1. No lower/upper bound for the first/last query (the bounds don't filter the table, as the docs say)
  2. One of the queries needs to include the null values
findepi commented 6 years ago

Following this approach we could have configuration (eg. mysql.properties) that would tell what column should be used for partitioning, number of partitions, low and max column partitioning column values.

Potentially those things can be realized automatically. Manual configuration should be considered fine-tuning, and things should work out of the box, if possible. Following sqoop's example, we could use primary key for partitioning and pull some information from target DB stats for low/high (we want to do this anyway, for CBO).

kokosing commented 6 years ago

Potentially those things can be realized automatically. Manual configuration should be considered fine-tuning, and things should work out of the box, if possible. Following sqoop's example, we could use primary key for partitioning and pull some information from target DB stats for low/high (we want to do this anyway, for CBO).

I was thinking about that too, I am only a bit concern that pulling information about low/high values might not be that trivial depending on RDBMS. This could be great extension, but let's start with something simpler and more straightforward.

findepi commented 6 years ago

I have no doubts this needs to be RDBMS-specific. But we need those extension points anyway, quite soon.