Open gianm opened 1 month ago
I am still reviewing the changes, so it might be moot once I complete the review:
they do not need meaningful CPU or memory resources beyond what is needed to gather partition statistics for global sorts
The controller can consume up to a maximum of 300MB of heap space for the collection of statistics, and this maximum seems a lot given that it will be per query, if compared with the default value of something like maxSubqueryBytes
.
For reference, a broker with 20 GB of heap space, and 50 max concurrent queries and no lookups would allocate 0.5 1/50 (20GB) of heap space per query for inlining results which is 200MB, while each Dart query can theoretically take up to 300MBs. This parity will be more for brokers with smaller heap sizes. Do we require some limiting for the brokers at the moment (like we do with subqueries) or would we take this up once we start tuning concurrency?
The controller can consume up to a maximum of 300MB of heap space for the collection of statistics, and this maximum seems a lot given that it will be per query, if compared with the default value of something like
maxSubqueryBytes
.For reference, a broker with 20 GB of heap space, and 50 max concurrent queries and no lookups would allocate 0.5 1/50 (20GB) of heap space per query for inlining results which is 200MB, while each Dart query can theoretically take up to 300MBs. This parity will be more for brokers with smaller heap sizes. Do we require some limiting for the brokers at the moment (like we do with subqueries) or would we take this up once we start tuning concurrency?
The heap space used by partition statistics is capped to 15% of the overall Broker heap, and is split across all possible controllers based on the maximum Dart concurrency. So a Broker with 20 GB of heap and 50 max concurrent Dart queries would use at most 60 MB for partition statistics per controller. Check out DartControllerMemoryManagementModule
for where this is set up.
Another thing is that I expect we won't be gathering partition statistics for most queries for very long. When the following two future-work items are complete, then globalSort
(and the statistics gathering) will only be needed when a query has an ORDER BY
without a LIMIT
at the outer level. Anything else would be able to use hash partitioning with local sorting, skipping stats gathering:
- Multithread
hashLocalSort
shuffles. Currently only one partition is sorted at a time, even on a multithreaded worker. This is the main reason the initial version is usingglobalSort
so much, even thoughglobalSort
involves more overhead on the controller.- Use hashLocalSort for aggregation rather than
globalSort
, once it's multithreaded, to reduce dependency on the controller and on statistics gathering.
There are a number of web console changes that are spread out between https://github.com/apache/druid/pull/17132, https://github.com/apache/druid/pull/17135, and https://github.com/apache/druid/pull/17147 but that were all developed together with the Dart engine as the prime motivation. Primarily the stage detail view has been updated.
Notably:
I've been doing some benchmarking of the version of this similar to the one that will be included in Druid 31 (as the experimental Dart engine). So far, it looks promising; bottom line from what I've seen is:
These results are in line with the initial focus on more complex, multi-stage queries. The performance on lightweight queries is better than I expected, and I think shows that if we put some work into it then the new engine could handle the whole spectrum of workloads.
These findings are from some test queries on the NYC Taxi dataset. There are benchmarks floating around that use the following four queries, which are all pretty lightweight:
SELECT cab_type, COUNT(*) FROM trips GROUP BY cab_type
SELECT passenger_count, AVG(total_amount) FROM trips GROUP BY passenger_count
SELECT passenger_count, FLOOR(__time TO YEAR), COUNT(*) FROM trips GROUP BY passenger_count, FLOOR(__time TO YEAR)
SELECT passenger_count, FLOOR(__time TO YEAR), trip_distance, COUNT(*) FROM trips GROUP BY passenger_count, FLOOR(__time TO YEAR), trip_distance ORDER BY FLOOR(__time TO YEAR), COUNT(*) DESC
I added two more, exercising high-cardinality groupbys and count-distincts:
SELECT ROUND(pickup_latitude, 3), ROUND(pickup_longitude, 3), ROUND(dropoff_latitude, 3), ROUND(dropoff_longitude, 3), COUNT(*) FROM trips GROUP BY 1, 2, 3, 4 ORDER BY 5 DESC LIMIT 1000
SELECT ROUND(pickup_latitude, 3), ROUND(pickup_longitude, 3), ROUND(dropoff_latitude, 3), ROUND(dropoff_longitude, 3), COUNT(DISTINCT trip_id) FROM trips GROUP BY 1, 2, 3, 4 ORDER BY 5 DESC LIMIT 1000
I also added a "needle-haystack" query, which selects just two rows matching a highly selective filter:
SELECT COUNT(*) from trips_60m where dropoff = '0101000020E6100000000000E0F87E52C00000008090564440'
I loaded an abbreviated version of this dataset on my laptop (60 million rows) and ran the test queries native and Dart. The results were (in ms):
I looked into the factors driving the different performance on Q3, Q4, Q5, and Q6:
timestampResultField
+ timestampResultFieldGranularity
optimization that native has, which swaps out timestamp_floor
for granularity
. This is activated for native for the FLOOR(__time TO YEAR)
.FLOOR(__time TO YEAR)
optimization as Q3. A smaller factor comes from the slightly larger resultset here (82,418 final results; 508,581 intermediate results before cross-segment merge). With the native engine, the cross-segment merge happens uses a single concurrent hash table. With Dart, the cross-segment merge uses a sort. The concurrent hash table performs better and wins over the sort. [one of the "future work" items is relevant to this.]COUNT(DISTINCT trip_id)
, there's also a large subquery. Dart parallelizes through the entire query and handles it nearly 7x faster. Native gets bottlenecked at processing the subquery.Finally, I ran a concurrency test with the "needle-haystack" Q7, using 50 threads issuing 500 queries each, back-to-back as fast as they could. The system was configured with druid.msq.dart.controller.concurrentQueries = 25
, druid.msq.dart.worker.concurrentQueries = 25
, and druid.query.scheduler.numThreads = 25
, so both native and Dart got to run 25 concurrent queries. The results were 1529 QPS (native), 730 QPS (Dart). There are various areas of potential improvement for this kind of workload, such as reducing the number of controller<->worker messages transferred during a query, processing multiple segments in a single frame processor, logging more quietly, etc.
Motivation
Druid 24 included a task-based multi-stage query engine proposed in #12262. This has proved useful for DML (REPLACE, INSERT) and querying directly from deep storage.
This proposal is to introduce the natural next evolution: an interactive "profile" of the engine. The same engine is configured to run interactively, including changes such as:
The main purpose of this engine is to provide a way to run queries that are too lightweight for the task-based MSQ engine to make sense, but too heavyweight for the standard native query engine to make sense. A good example would be a
GROUP BY
with an intermediate resultset of hundreds of millions of rows. In general this engine would specialize in the sort of midweight, ad-hoc queries that are common in the data warehousing world. I believe with some additional work it would also be possible to run lightweight, high QPS queries competitively with the standard native query engine.Proposed changes
Name
In the initial PR I used the name dart for this profile of the engine. Darts are lightweight and go fast, which are good qualities in an interactive query engine. It even has a possible backronym: "Distributed Asynchronous Runtime Topology".
API
Initially I'm proposing an API that is compatible with the SQL query API, to make it easy to try out the new engine.
To issue a query,
POST /druid/v2/sql/dart/
the same form of JSON payload that would be accepted by/druid/v2/sql/
. Results are also in the same format. This is a synchronous API, although internally the engine is asynchronous, so it is definitely possible to introduce an asychronous API later on.To issue a query and also return a report with stages, counters, etc,
POST /druid/v2/sql/dart/?fullReport
. This is like anEXPLAIN ANALYZE
. The report is in the same format as the reports generated by the task-based engine.To see a list of running queries (a feature that the native engine does not have),
GET /druid/v2/sql/dart/
.To cancel a query,
DELETE /druid/v2/sql/dart/{sqlQueryId}
.To check if the engine is enabled,
GET /druid/v2/sql/dart/enabled
(returns 200 or 404).Servers and resource management
Controllers run on Brokers (one per query) and the workers run on Historicals. Resource management would be bare-bones in the initial version, limited to simple controls on the number of concurrent queries that can execute on each server.
On Brokers, there are three configs:
druid.msq.dart.enabled = true
to enable Dart.druid.msq.dart.controller.concurrentQueries
provides a limit to the number of query controllers that can run concurrently on that Broker. Additional controllers beyond this number queue up. Default is 1.druid.msq.dart.query.context.targetPartitionsPerWorker
sets the number of partitions per worker to create during a shuffle. Generally this should be set to the number of threads available on workers, so they can process shuffled data fully multithreaded.Brokers only run controllers, so they do not need meaningful CPU or memory resources beyond what is needed to gather partition statistics for global sorts. (And anyway, I'd like to use fewer global sorts in the future; see "Future work" around
hashLocalSort
.)On Historicals, there are three configs:
druid.msq.dart.enabled = true
to enable Dart.druid.msq.dart.worker.concurrentQueries
provides a limit to the number of query workers that can run concurrently on that Historical. Default is equal to the number of merge buffers, because each query needs one merge buffer. Ideally this should be set to something equal to, or larger than, the sum of theconcurrentQueries
setting on all Brokers.druid.msq.dart.worker.heapFraction
provides a limit to the amount of heap used across all Dart queries. The default is 0.35, or 35% of heap.The initial version does not run on realtime tasks, meaning realtime data is not included in queries.
Resource management is very simple in the initial version. It works like this in the version that is headed for Druid 31:
I expect this will evolve over time. The "Future work" section includes thoughts on how resource management could evolve.
Operational impact
None if the engine is disabled or if queries are not being issued. If queries are being issued, on Historicals, Dart queries use the same merge buffers and processing pool as regular native queries, so would potentially conflict with other queries that need those resources. They also use up to 35% of heap space if actually running.
On Brokers, Dart queries use the same HTTP threads as regular native queries, and could conflict there as well.
The API and all configuration parameters should be considered experimental and subject to breaking changes in upcoming Druid releases, as the initial version of the feature evolves. The ability for Dart queries to function properly in a mixed-version environment (such as during a rolling update) is also not be guaranteed for these initial experimental releases. Nevertheless, this would have no impact on regular queries.
Future work
Some thoughts on future work items.
System:
API:
/druid/v2/sql/statements
.Resource management:
Performance items:
hashLocalSort
shuffles. Currently only one partition is sorted at a time, even on a multithreaded worker. This is the main reason the initial version is usingglobalSort
so much, even thoughglobalSort
involves more overhead on the controller.hashLocalSort
for aggregation rather thanglobalSort
, once it's multithreaded, to reduce dependency on the controller and on statistics gathering.SELECT COUNT(*) FROM tbl
.