apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.31k stars 3.66k forks source link

Multi-stage distributed queries #12262

Closed gianm closed 1 year ago

gianm commented 2 years ago

This proposal is about extending Druid to support multi-stage distributed queries, something that I think will be really exciting and amp up what Druid is capable of.

Motivation

Today Druid's distributed query stack is single-stage: the Broker receives a query, slices it up into pieces, and sends one piece to each Historical. The Historicals produce partial results for their piece of the query, send them back to the Broker, and the Broker merges them into the final result set.

This venerable design works well when the bulk of processing can be done at the leaf servers that store the actual data. It scales well: Druid can handle thousands of QPS of queries, very CPU and memory efficiently, so long as there are enough Historicals running and so long as the amount of data that makes it to the Broker is relatively small.

Druid has a lot of functionality that is designed to make this single-stage distributed query stack as useful as possible. Whenever possible, we push down projections, filters, aggregations, limits, and joins. We also have a suite of builtin approximations, like approximate topN and various sketches, that people can use to minimize the amount of state that must be tracked beyond the leaf servers.

But there are some cases where this query stack doesn't work well, and which provide the motivation for multi-stage queries:

  1. Queries with very large result sets. Imagine a GROUP BY query with no LIMIT that would return billions of rows, because it groups on something with very high cardinality. Today these result sets must flow through the Broker, which creates a bottleneck.
  2. Additional SQL support. We'd like to keep expanding Druid's ability to support SQL until we can do it all. Some operations that'd we like to support, like joining together two distributed tables, cannot be done in the single-stage design.
  3. Complex query structures. Today, a query with lots of JOINs and subqueries will execute certain queries in a fully distributed manner -- basically, anything that can be represented as a base distributed table joined with broadcasted tables -- and then finish the remainder of query processing on the Broker. In some cases this works OK, because most of the work can be done as part of the distributed queries. But in other cases it results in a lot of data being collected on the Broker, which leads to an error like "Subquery generated results beyond maximum".
  4. Ingestion. Some databases use their query stack to handle ingestion. It makes sense if you think about an ingestion as a query that writes its results to a table. It would be nice if we could do this too, so we wouldn't need to maintain an ingestion execution stack that is separate from the query execution stack.

Design

There are a lot of interesting pieces here, so I just want to touch on each one in this main proposal. Each one should then be fleshed out separately.

Query representation and SQL planning

Background

Druid models native queries as "fat" query types that represent a full distributed query. There are four query types used by the SQL engine: scan, timeseries, topN, and groupBy. Each query type represents an entire single-stage distributed computation: there is a first piece that can run distributed, and a second piece that must run on the Broker. They all internally handle filtering, projection, sorting, limiting, etc. They all have a "dataSource" field that describes where they should get their data from, which can be "table", representing an actual Druid datasource; "query", representing a subquery, and which can be any other query type; "join", representing two other datasources joined together; or a handful of other less-common datasource types.

The SQL planner's main job is to morph a tree of relational operators either a single native query or a tree of native queries. In the latter case, it uses "query" or "join" datasources to link the native queries together. It has limited flexibility in how it does this, because native queries have a rigid computational structure: they always do broadcast join first, then projection, then filter, then aggregation, then sort.

Proposal

To support ever-more complex query structures, we will need a way of representing data flow that is more flexible than native queries. The tried and true DAG approach will work well here. So I propose that the multi-stage engine should model queries as a DAG of "stages". Each stage:

The stages would be finer-grained than native queries. For example, a stage might do scan-filter-project, or aggregate, or sort, or limit. This is a common approach in relational databases.

As a first step, I suggest we keep native queries in the picture by translating SQL -> native query -> DAG-based query. This can be done with minimal changes to the SQL planner, and would enable classic native queries to exist at the same time as DAG-based queries without much code duplication. But at some point, we'll want to go directly from SQL -> DAG-based query, since that will give the SQL planner more flexibility to reorder operations.

Here's an example of how a query would look. This SQL query:

SELECT
  session,
  COUNT(*) AS cnt
FROM tbl
WHERE browser = 'Firefox'
GROUP BY session
ORDER BY cnt DESC
LIMIT 10

Would have five stages:

  1. Scan: Read tbl, apply the filter browser = 'Firefox', and project out session. No shuffle.
  2. Aggregate I: Locally group by session and compute count(*) for each. Shuffle by session.
  3. Aggregate II: Continue grouping by session within each partition, and sum the partial counts to get full counts. No shuffle. This produces a fully grouped resultset, still partitioned by session.
  4. Sort: Locally order by cnt DESC. Shuffle everything into a single partition.
  5. Limit: Take the first 10 rows from the single partition generated by the prior stage.

When stages connect without shuffling, we can pipeline execution locally on the same servers, so there is no need for buffering or cross-server traffic. When stages connect with shuffling, we'll need to exchange data across servers. Depending on the needs of the query, the producing stage could stream to the consuming stage, or the producing stage could buffer up all results before the consuming stage starts.

For the query above, if we know (or are willing to bet) that there are not too many distinct session values then we can run it just as efficiently as the current single-stage approach. First, we'll make sure that Scan and Aggregate I are scheduled on the same set of workers, so the output of Scan can be pipelined into Aggregate I locally in memory. Then, we'll configure the shuffle in Aggregate I to shuffle everything down to a single partition. Next, we'll make sure that Aggregate II, Sort, and Limit all run on the same server. That server would be responsible for gathering all the partitioned Aggregate I outputs and preparing the final result. Finally, we'll set things up so all stages run concurrently, and so Aggregate II streams from Aggregate I. Put together, this is exactly what the Historicals and Brokers do today.

Ingestion

In #11929 there is a proposal for adding a SQL INSERT statement. It could be implemented on top of our existing batch ingestion tasks: the SQL layer could convert relational operators to indexing tasks just as well as it can convert them to native queries. But I think it would be nicer to implement it on top of a multi-stage query stack.

We'd just need to do two things:

  1. Provide a way for the query stack to read external data. There's a natural way to do this through an "external" DataSource that maps onto RowBasedSegments. It would be similar to how we plug lookups into the query engines, except a little more complicated because we'll need to split the external datasource before sending it down to various servers.
  2. Provide a way for the query stack to generate segments. Once we have this multi-stage structure this is also natural: we can add a final stage to any query that shuffles data to match the target segment size and then generates and publishes segments.

So I propose that we work towards this instead of having the SQL planner generate batch indexing tasks. It has a nice side benefit: part (1) alone means that we'd be able to query external data in a regular query too. I don't think that'll be a core use case for Druid, but it has some usefulness, like previewing what an INSERT might do, or doing an ad-hoc join of Druid datasources with some external data.

We'll also need to figure out what to do about streaming ingest at some point. I'm not sure what to do there but I think there are a few options that make sense. Even if the multi-stage query stack doesn't have builtin support for streaming queries, we can layer streamingness on top in the same way that we do today with indexing tasks: there is a supervisor that manages a series of tasks, each of which reads a chunk of data from Kafka and then publishes it.

Server setup

There is an obvious question: where will this query engine run? Today we have Brokers, Historicals, and indexing stuff (MiddleManager or Indexers). I don't want to add a net new service or process type in the long run, because we have a lot already. But I think in the short run we should add a new process type.

I think at first the Broker should remain in charge of SQL planning, and may route certain queries to these new multi-stage query processes if they are running in that particular cluster. It could do this based on user request (like a context parameter) or based on aspects of the query (like presence of INSERT, or external data, or a "complex enough" query).

However, like I said, in the long run I don't think it's a good plan to have an extra process type on top of all the ones we already have. So I think in the long run we should shoot for this process type actually being able to serve as a replacement for the Broker, Historicals, and MMs/Indexers. By that I mean that in the fullness of time, it should be able to:

At that point, a small Druid cluster would be pretty simple: just a Coordinator and this new process type (or a few of them for scale-out). And a large cluster could still specialize. Imagine setting up a tier of these new processes that cache segments but don't perform ingestion, and another tier that don't cache segments but do perform ingestion. That's similar to setting up Historicals and MMs today, but more configuration-driven. It would allow simple clusters to be simple and complex clusters to be complex.

Code changes

A sketch of the code changes I think we'll need to make the above work out:

  1. Low-level mechanism for shuffling data between servers. This should include an efficient way to transfer large amounts of data over the network. Currently we use Smile-over-HTTP for this, which works well for relatively small amounts of data, but I think we can do better. It should also include both a pipelined implementation (producing stage streams to the consuming stage), and a buffered implementation (producing stage generates all data before consuming stage starts) for operations that cannot be pipelined.
  2. Cluster-level execution coordinator for a multi-stage query.
  3. Server-local execution coordinator for a single stage of a multi-stage query.
  4. An actual process type for the above three things to run in.
  5. Converter that generates multi-stage (DAG-based) queries from today's native query types. (This enables hooking into the existing SQL layer, since it generates native queries.)
  6. To make ingestion-through-query work: adapters from external data into the query stack, and from the query stack to the segment-generation code.

At Imply we've started to prototype a multi-stage query engine with the above design. We've implemented some of the pieces: not all of them, but enough to perform some basic queries. At this point I thought it would be a good time to open up a discussion with the wider community, since as we continue working on this stuff, we'd like to integrate it into Apache Druid.

Rationale

A.K.A. the "why not" section.

Why not integrate with an existing open-source query engine?

I wouldn't want to stop anyone from integrating with another open-source query engine. Some people might prefer to deploy that way.

But I think we will want this kind of functionality as something native in Druid, for two reasons. First: I think the user experience will be nicer if we don't require a dependency on another system. Second: I expect there will be opportunities for optimization that we can use if everything is built in to Druid, and that would be tough to implement with an external query stack.

Besides, the work we'll need to do in order to build a multi-stage query engine will also benefit integrations with other engines.

Why not gradually add multi-stage capabilities to the existing query stack?

I think it's a question of complexity.

The core query stack has three main hook points: individual segment scan, result merge on Historicals, and result merge on Broker. The implementations of the native query types, especially groupBy, have got quite complex over the years as new features have been added and needed to be mapped onto these hook points. For example: subtotals, ordering, limiting, having, are all typically done as part of "result merge on the Broker". The hook points are doing more work than originally envisioned, which makes the code more difficult to follow and extend.

So I think it's time to try a different approach with the query stack, rather than adding new capabilities. As we create a DAG-based query stack, we can split queries up into stages and move more of the "fancy stuff" to the framework, which will simplify the query-specific logic.

We can still share code, though. It won't be 100% new. We can share all of the StorageAdapter stuff (cursors, etc), the ColumnSelectorFactory stuff, and the single-segment processing code for groupBy and topN. This lower-level code is super optimized and works well. It's more the mid/high level stuff that IMO would benefit from a different approach.

samarthjain commented 2 years ago

Thank you for the proposal, Gian! Building a DAG based query execution model seems like the next logical thing to do in Druid. I am excited to see progress on that front.

Below are some adhoc comments:

1) Build for resilience - as queries get more and more complex, chances of them running into failures because of network blips or bad hardware go up. As we are building these new capabilities, we should think about building resilience including capabilities to restart stages, exponential backoffs in case of network partitions, speculative execution of a certain percentage of tasks, etc.

2) Resource fairness - currently Druid has limited support for ensuring resource fairness outside of query laning. As queries supported by Druid get more complex, they will push boundaries on memory, disk, cpu and network. Ensuring fair resource usage to avoid queries from starving each other and affecting overall system stability will be critical, IMHO.

3) Scaling and decoupling of shuffle servers - considering shuffle servers will be mostly stateless other than storing intermediate query state, to me it makes sense to have them as independent servers not serving any other Druid functionality. This would make it easier to scale them up.

4) A UI to show the query DAG and plan would be good to have.

julianhyde commented 2 years ago

Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?

gianm commented 2 years ago

@samarthjain Thanks for your comments. I totally agree about the importance of fault tolerance and good resource management. It'll be important as queries can get more complex and longer-running.

Resource fairness

For resource management, I was thinking that for really big clusters, tiering (i.e. isolation of different workloads onto different servers) is going to be important. That way you can avoid sharing caches, etc between your interactive and your noninteractive workloads. I was thinking that in a dynamic cloudy sort of environment, you'd want the servers running interactive queries to run persistently, whereas you'd want the servers running multi-stage queries to be spun up on demand.

In smaller or less dynamic clusters, where you're likely to run mixed workloads on the same servers, I agree we'll need to improve the current resource management system to make that work better. I have some fuzzy ideas about this but it really deserves to be fleshed out into its own proposal. I think it'll need to involve some degree of dynamism, like observing how much resource queries are actually using and adjusting their priorities appropriately.

Scaling and decoupling of shuffle servers

I'd like to provide a smooth experience for both small clusters and large ones. I was thinking that for small clusters, or even single servers, you'd want the same servers doing all kinds of functionality, to keep management simple. For larger clusters I was thinking you'd want more isolation and independent scaling. I think we can make this possible with a server type that can handle a wide array of functionality, coupled with the ability to configure specific instances to only do one specific thing if you want independence.

A UI to show the query DAG and plan would be good to have.

Yes!!

gianm commented 2 years ago

Gian, Are there any algorithms that would be made possible by this architecture? (By algorithms, I mean distributed implementations of relational operators, e.g. shuffle-sort, shuffle-join, broadcast-join, hybrid-hash-join, use of bloom filters.) If so, can you list the algorithms that you have in mind?

@julianhyde I had in mind that a "partition-sort-shuffle-merge" operation would be the basic building block. The "execution coordinator" I mentioned should understand those, & for each stage should accept parameters like:

With that building block I think the most useful algorithms to implement at first would be shuffle sort (trivial given the building block), sort-based aggregation, sort-merge join, broadcast hash join, and shuffle hash join.

Btw: I think certain of these algorithms would be hybrid in some way if you consider the flow end to end. For example: by "sort-based aggregation" I just mean the distributed part. I had imagined the part that happens local to the leaf server being hash-based, followed by a sort-based distributed step.

julianhyde commented 2 years ago

That makes sense. I mentioned Bloom filters because they (like correlated restarts) need to travel in the opposite direction to the flow of data, and complicates the execution. It's wise to start off with something simple. Other algorithms can be added later.

gianm commented 2 years ago

@julianhyde Do you have a reference for what sort of algorithm you had in mind that uses such opposite-direction-flowing Bloom filters? I'd like to read more about it.

FrankChen021 commented 2 years ago

At that point, a small Druid cluster would be pretty simple: just a Coordinator and this new process type (or a few of them for scale-out)

This is a big advance for Druid that addresses the pain point of current complex conceptions and deployment. One of the reasons that many people now prefer ClickHouse as their analytic DBMS is its simplicity, they only have to download one process and start it and then they can import data and query.

I'm also wondering if the core of Druid is combined into one process, is it possible to make Druid a built-in component, like an embedded database system, for other systems to integrate? I have this idea because we're using Druid to store application metrics, but for Druid's own metrics, they're emitted into other time-series database like prometheus. If Druid can store its own emitted metrics by using its own storage and query capability, it can greatly simplify the operation.

julianhyde commented 2 years ago

@gianm When you are doing a big join with filters on both sides - e.g. orders from customers in california for red products - then you only want to read customers who may have bought a red product, and also only want to read orders placed by a customer in california. Thus you want a filter implied by the join to travel both ways, and you can handle a few false positives. A good way is to generate a bloom filter as you are scanning customers and pass it to the scan of products, and vice versa.

In short, approximate semi-joins pushed down to each sides of a join as pre-filters.

The paper "Sideways Information Passing for Push-Style Query Processing" from 2008 describes the idea pretty well. But we were doing it in Broadbase in around 1998.

gianm commented 2 years ago

I'm also wondering if the core of Druid is combined into one process, is it possible to make Druid a built-in component, like an embedded database system, for other systems to integrate? I have this idea because we're using Druid to store application metrics, but for Druid's own metrics, they're emitted into other time-series database like prometheus. If Druid can store its own emitted metrics by using its own storage and query capability, it can greatly simplify the operation.

@FrankChen021 I think that would be great, and it is indeed a goal I have in mind with this work. It should be possible to have a pretty simple self contained single-server deployment. You'd need to worry more about backups, though, if you care about that sort of thing…

gianm commented 2 years ago

@gianm When you are doing a big join with filters on both sides - e.g. orders from customers in california for red products - then you only want to read customers who may have bought a red product, and also only want to read orders placed by a customer in california. Thus you want a filter implied by the join to travel both ways, and you can handle a few false positives. A good way is to generate a bloom filter as you are scanning customers and pass it to the scan of products, and vice versa.

Ah, that's a cool idea. I hadn't been thinking about it before, but now I will be, which is good 🙂. Thanks for bringing it up.

julianhyde commented 2 years ago

@FrankChen021 When I was building the Druid adapter for Calcite I would have loved an embedded Druid option. It would have saved me lots of effort provisioning a VM (with enough memory for the 4 or 5 JVMs that Druid requires) in which to run the unit tests. I suspect that an embedded Druid (even - if I could dare to hope - one that is single-threaded for common tasks like preparing and executing queries) would be very helpful for Druid developers too. It's so nice to be able to start up and stop the whole server in your IDE.

atris commented 2 years ago

Hi @gianm ! I recently started exploring Druid and was wondering if I could help with this issue. Please let me know, thanks!

gianm commented 2 years ago

An update: we're planning to start doing PRs to set up the batch-query version of the feature in the near future. This batch-query version runs via the indexing service, and is suitable for ingestion and data management queries (INSERT, REPLACE as described in #11929). Each one is meant to explore one new area: they're split up this way to aid review and discussion. Here's the planned sequence.

  1. A more reliable RPC client for task -> task and task -> Overlord communication. This one is important for ensuring the communication between servers involved in a query is really robust.

  2. A binary format for transferring data between servers, and for spilling data to disk, that is more efficient than the Smile format we currently use. This one is important because of the high amount of data being transferred between servers. This would be in core. At first it would only be used by the query task (see next point 3), but I do think it would make sense to use it for more stuff over time, like Historical -> Broker communication and spilling for GroupBy queries.

  3. An indexing service task that can run Druid queries ("query task"), that can accept a target datasource describing where to put the results, and that can handle "external" typed datasources. This would be able to run the kinds of queries generated by the functionality in #11929 (i.e., the kind currently being generated in CalciteInsertDmlTest and CalciteReplaceDmlTest). This would likely be in an extension.

  4. SQL bindings for the query task. This would involve adding an endpoint that creates of a query task rather than running a query using the Broker-and-Historical-based stack. This change would enable an INSERT or REPLACE query to actually execute as a query task. This would likely be in that same extension.

  5. Web console support for the new SQL syntax and endpoint.

After (3) the functionality will be usable for ingestion and data management, but kind of obscure, because you'll need to embed a native Druid query into an indexing service task. After (4) it will be easier to use, since you can write SQL. After (5) it will be VERY easy to use! Then we can focus on making it more robust, more performant, adding features, and improving ease of use even further.

I mentioned that we've been prototyping some of the pieces of a multi-stage query at Imply. This sequence of PRs represents a contribution of everything we've prototyped so far. Of course, even after doing all of this, the functionality wouldn't be well suited for low-latency queries. The indexing service task system is not really designed for that. But there is a path towards making low-latency queries happen in a way that shares substantial code with this initial, task-oriented functionality. We'll be able to walk that path in the Apache Druid codebase using these contributions as a base.

Very much looking to feedback on this work, and integrating it into Druid!

gianm commented 2 years ago

Hi @gianm ! I recently started exploring Druid and was wondering if I could help with this issue. Please let me know, thanks!

Hey @atris — I think one of the best things you can do is take a look at the PRs I mentioned above, try the functionality out, and provide some feedback on the design.

gianm commented 2 years ago

First PR (RPC): https://github.com/apache/druid/pull/12696

nishantmonu51 commented 2 years ago

I had observed the smile data transfer to be a bottleneck for large resultsets as well, @gianm: can you also provide more details on your thoughts around the alternative format for Smile ? I wonder if something like Arrow flight could be useful standard to adopt here.

gianm commented 2 years ago

Next patch (frame format): https://github.com/apache/druid/pull/12745

@nishantmonu51 this is responsive to your question earlier! I thought about using Arrow, but didn't end up using it for a couple reasons:

gianm commented 2 years ago

Even though the current work is focused on batch ingestion, I wanted to write down some thoughts about low-latency queries. I expect two pieces of the current work will be especially useful: frame format/processors, and the modeling of multi-stage queries as DAGs of processors.

First, goals. We want to retain the good properties of the current low-latency query stack, and improve in areas where we can improve.

Good properties we should keep:

Things we can improve:

Second, form factors. There's four reasonably natural form factors the low-latency stuff can take: (1) the Historicals do the leaf stages [the ones that directly process segments] and Brokers do intermediate and final stages; (2) the Historicals do all work [leaf and intermediate], and the Broker only receives and returns final results; (3) the Historicals do the leaf stages, Brokers receive and return final results, and we introduce a third service type for intermediate stages; and (4) we introduce a completely new service type that is neither a Historical nor a Broker.

We should choose the path that gets us to the goals quickest and makes life easiest for users. (1) or (2) seem ideal because they provide a clean migration path. I don't like (3), because two service types seems like enough already, and adding a third would add significant operational complexity. (4) seems like a second-choice option if (1) or (2) don't work out.

Welcome any additional thoughts.

FrankChen021 commented 2 years ago

Wow, DAG-based processors, it's good to hear that.

abhishekagarwal87 commented 2 years ago

Awesome. with this capability, we can do shuffle joins, window aggregations, etc without bottlenecking on the broker. have you also thought about elasticity? Right now, scaling historicals up and down is tricky for large clusters as segment balancing takes time and historicals need to download segments from deep storage. with elasticity in mind, (1) makes much more sense than (2) because

clintropolis commented 2 years ago
  • No, or reduced, need for server thread pool size and connection pool size tuning. Today the server thread pool sizes on Brokers and data servers, and the Broker-to-data-server connection pool size, dictate the number of queries that can run concurrently. Each query needs to acquire a connection from the Broker-to-data-server pool, and a server thread from the Broker and all relevant data servers, in order to run. High QPS workloads require tuning these three parameters. Too low and the hardware isn't maxed out; too high and the system can suffer from excessive memory use or context switching. It'd be better to arrive at good throughput and stability without needing to adjust these parameters.
  • In the out-of-box configuration, query priorities apply only to the processing thread pools. They don't apply to the resources mentioned in the prior bullet. So, it's possible for low priority long-running queries to starve out high priority queries. Query laning helps with this, but it isn't configured out of the box, and the 429 error codes make it more difficult to write clients since retries are necessary. I think we can come up with a better solution. I suspect we'll want to decouple server-to-server queries from the http request/response structure.

I think these are both artifacts of not using async http handling so that we could manage the queue of query requests ourselves? I believe async http request handling for queries would be actually required as long as we want to maintain the current HTTP request API model for interactive queries. Async http handling was a longer term follow-up goal from the laning stuff that I sort of forgot about and never got back to 😅 , but is something I had in mind - the 429 would be replaced instead with feeding query requests into a lane specific queue to wait for a processing slot and lanes themselves prioritized. Likewise, i imagine the broker -> historical pools could be growable if we didn't also need to use them to control overall load, since the historicals would be controlling their own load and open requests wouldn't be blocking anything anywhere.

  • Query types today are monolithic and handle many functions internally (like aggregation, ordering, limiting) in a fixed structure. Splitting these up into smaller logical operators would simplify planning and make execution more flexible. It will also allow us to factor the execution code into more modular units. Some discussion in https://github.com/apache/druid/pull/12641.

What is the migration path to something like that? Build the new thing side-by-side I guess? There is a fair bit of discussion in the proposal and some indication of doing it in-place, but unless I missed it, details on how it might actually be done for the more optimized/complicated engines incrementally are a bit light, and it doesn't really seem settled in the comments. A lot of the discussion and the prototype are in terms of the scan query, which is a bit too simple to prove anything imo.

Given the heavy differences with how stuff currently works, the only non-dangerous way it seems to do this is the side-by-side rebuilding of like everything above segment and segment-like reading stuff (selectors/filters/indexes/cursors/etc), query type at a time? Otherwise the amount of proving necessary to ensure for example that branches caused by feature flags in potentially performance critical paths aren't impacting the existing query processing feels like it would be too much. Maybe i'm being too cautious though?

All that said, it sounds nice if we can work it out to be the same or better performance than the existing engines.

paul-rogers commented 2 years ago

@gianm, sounds like a great set of improvements! The way we achieved these goals in other systems (Drill, Presto, Impala, Hive, etc.) is to use the classic optimizer + operator DAG architecture. The optimizer allows queries to handle a wide variety of use cases (anything that can be expressed in SQL): scans, groupings, joins, windowing, grouping over joins over scans topped by windowing, etc. The optimizer (AKA "planner") handles distribution: whether that be classic Druid scatter-gather or a more general multi-stage plan. The optimizer also handles tasks such as data locality, resource balancing, etc. This is how Drill, Impala and Presto handle those monster 1000 like SQL statements will optimal performance (given the data format they work with.)

Druid may never need to handle complex BI queries. Even in our domain, the occasional shuffle join, cold-tier query, union, wide merge and other operations would benefit from the optimizer/operator architecture. While, for Druid, these are stretch goals, for those other engines they are warm-up exercises because the architecture is designed for flexibility.

The reason for the optimizer/operator architecture is enable the software (the planner) to make decisions about query execution, resources, distribution and so on. Query engines do that so the user doesn't have to. For example, with Druid, we need experts such as yourself to optimize queries and the cluster. But, most other products, the software itself does the query optimization work via the optimizer. Both approaches work, of course. Many products see automation as a win to lower costs, promote ease-of-use, handle a range of use cases, etc. The usual argument about automation vs. manual tasks.

The planner needs a way to execute, which is typically a set of operators that the planner can rearrange as needed to produce any given query. Think of Calcite with its built-in LinqJ solution. Drill, Impala and Presto all are based on "lego block" operators which the optimizer arranges as needed for a given query. In the MSQ work, frame processors can be seen as heavy-weight operators: they can be composed, but not quite with the flexibility and nuance of lighter-weight operators.

Operators need a way to process data. Frames seem like a great solution. DAGs need a way to shuffle data. Both HTTP and more sophisticated solutions can work and are isolated in the "exchange" operators.

Druid, of course, has classically used its ad-hoc query runner/sequence/native query stack which has been great for a decade, but, of course, is optimized (and limited to) the scatter-gather architecture. It can be extended with new native queries, more ad-hoc logic, and so on. This extension, however, cannot address the fundamental limitation: that every SQL query is mapped into a set of one or more native queries, and the Broker combines the results. That's the pattern we want to generalize if we want to expand use cases.

The good news is that that classic Druid architecture does, obviously, have the key data pipeline operations, just wrapped in query runner/sequence code. Some prototypes were done to show how we could evolve this into the optimizer/operator architecture. See this issue. The "cache-aware work assignment" mentioned in the above post is one part of what an optimizer normally does, and would make a great contribution toward an optimizer.

So, a good question for the team to consider is if we can get where we want by continuing to roll our own approach, extended with the features outlined above (frames, better exchanges), or if we actually need the flexibility of the industry-standard optimizer/operator architecture. If we do, there is a path to get there, based on the team's excellent work to date, including the mechanisms discussed above.

paul-rogers commented 2 years ago

@clintropolis asked:

What is the migration path to something like that? Build the new thing side-by-side I guess?

For the optimizer/operator prototype, the approach taken was to evolve the current code. First, replace sequences with operators in a way that the two interoperate. Then, we'd find that query runners do nothing other than call a "planner function" that produces an operator, so we can gather up that code into a low-level "native query planner" that is similar to the controller work manager @gianm created.

Once we've converted the query runner/sequence stack to native query planner/operators, we can, if we're confident, just ship that. (Operators are designed to be easy to test, unlike query runners and sequences, so we can more easily ensure quality.) Or, if we are cautions, we offer both. The QueryLifecycle either goes down the current path, or the new path, returning results the same way regardless of engine used.

From there, we can extend the planner for more use cases: multi-tier distribution, distributed joins, etc. We can also combine planner levels. Currently we have Calcite, the native query maker and the query runners. With MSQ, we also have the controller's work allocation code. After the above step we have Calcite, the native query maker and the native query planner in one path, Calcite, the MSQ query maker, and the Controller work allocator in the ingest path. We can begin to merge the planners so we plan the SQL query, not the native query. That removes barriers to what we can offer uses. "Classic" queries are still fast, but BI queries are possible without overloading the Broker. Calcite knows about windowed aggregations, we must add a windowing operator and we're (mostly) good to go.

Over time, we could, if there was reason, end up with the common solution: Calcite for query parsing, analysis and query optimization. A Druid-specific layer to convert Calcite rels into the equivalent Druid operators. Finally, execution frameworks optimized for low-latency (in-memory processing) or ingest (disk-based processing). The operators are the same, only the exchanges differ on the data pipeline path. (The execution management, resource management, error handling and other aspects would be different, but the data pipeline is mostly blissfully ignorant of these aspects.)

While rearranging the planner, we should be able to move items one at a time: they become operators to Calcite, with the remaining native query planner as remaining as the big, complex operator we have today in the DruidQueryRel where a native query is a Calcite relational operator.

We could also merge the MSQ efforts. Once sequences are replaced with operators, the operators can change to use frames (rather than Java arrays) as their storage format. Since operators are very flexible, during the transition we can have some operators use frames while others continue to use arrays. "Shim" operators handle translation, the same way that, in the sequence-to-operator transition, shims handle the sequence-to-operator and operator-to-sequence conversions. We'd want to remove the shims in the end, but they do allow an incremental approach to development.

Similarly, if we want to change how we do shuffles, we can more easily do so with operators. In an operator DAG, an "exchange operator" is physically comprised of two parts: a sender and a receiver. The query stack doesn't really care how these work, as long as the planner inserts matched pairs. We can start with the existing HTTP scatter/gather. Then, we add a new exchange operator based on frames, or async frames, or a connection-based solution, or multiplexed, connection-based frames. Or, for batch MSQ, based on external files. The "Lego" nature of operators means that the rest of the DAG doesn't care as long as the sender consumes rows from "downstream" and the receiver provides them "upstream".

Experience with other tools has shown that, with the operator-based approached, each operator can be evolved and tested in isolation, allowing us to safely incorporate MSQ functionality step-by-step.

Similarly, as long as two planner versions produce the same plans for the same queries, we can be sure that we've not broken anything. If version X is supposed to add new features, we can easily verify that only the target plans changed, and everything else remains the same. We don't need to actually execute the queries to verify the planner: we just compare "physical plans". There was a PR that started down this path: #12545, though it was closed for now to focus efforts elsewhere. This PR was based on a technique which both Drill and Impala used with great success.

The point is, others have worked out solutions for safely evolving an optimizer/operator-based query engine. We can shamelessly borrow those ideas where they are useful.

gianm commented 1 year ago

Awesome. with this capability, we can do shuffle joins, window aggregations, etc without bottlenecking on the broker. have you also thought about elasticity? Right now, scaling historicals up and down is tricky for large clusters as segment balancing takes time and historicals need to download segments from deep storage. with elasticity in mind, (1) makes much more sense than (2)

@abhishekagarwal87 Good point. This analysis makes sense and I agree (1) makes more sense from an elasticity point of view. As you point out, the historicals are generally going to be more gradually scaled out due to their loaded-up caches.

gianm commented 1 year ago

I think these are both artifacts of not using async http handling so that we could manage the queue of query requests ourselves? I believe async http request handling for queries would be actually required as long as we want to maintain the current HTTP request API model for interactive queries.

@clintropolis I agree that server stuff does need to be "more async", although it may take a different form than converting our existing protocol to use the Jetty async stuff. There's two things I'd keep in mind when designing what to do next:

clintropolis commented 1 year ago

We don't have to keep the current request/response structure. If it helps, we can change to a protocol that allows multiplexing of multiple requests over the same connection.

Sure, though that is probably an involved migration for current applications that use druid (if we are talking about the user facing api, +1 for doing this internally though at least). The simplicity of the current API is attractive for interactive speed queries at least, so I'd be sad to see it leave completely.

Queues can't grow forever. Async handling allows the queue to grow bigger than request-per-thread handling, but there is still a limit, and we still need to have some reasonable behavior around those limits.

Totally, the jetty request queue under the covers of what we are currently using is limited too, we just aren't in direct control of it or its contents and happens before we've parsed request bodies into queries so that we can understand and categorize them. That is why the only place where we can enforce our limits right now is on the active thread pool size which is dramatically smaller than the queue size. If we control the queue (queues), then we could limit based on queue size instead of the approximate active running query count, which i think is what we want.

gianm commented 1 year ago

Sure, though that is probably an involved migration for current applications that use druid. The simplicity of the current API is attractive for interactive speed queries at least, so I'd be sad to see it leave completely.

It's also an option to keep the same request/response protocol at the outer layer, but use a different protocol for the inner layer. In my experience the inner layer (Broker -> data servers) is the one that is tougher to tune, so that's the one that could most use improvement.

clintropolis commented 1 year ago

It's also an option to keep the same request/response protocol at the outer layer, but use a different protocol for the inner layer. In my experience the inner layer (Broker -> data servers) is the one that is tougher to tune, so that's the one that could most use improvement.

Hah, edited my comment to say more or less the same thing, good thing you quoted the original 😜

gianm commented 1 year ago

The way we achieved these goals in other systems (Drill, Presto, Impala, Hive, etc.) is to use the classic optimizer + operator DAG architecture. The optimizer allows queries to handle a wide variety of use cases (anything that can be expressed in SQL): scans, groupings, joins, windowing, grouping over joins over scans topped by windowing, etc. The optimizer (AKA "planner") handles distribution: whether that be classic Druid scatter-gather or a more general multi-stage plan. The optimizer also handles tasks such as data locality, resource balancing, etc.

@paul-rogers imo, this is absolutely where we want to go. The monolithic native queries will not serve us well in a world where queries and dataflow can get more complex. It seems clear that the ideal structure is, as you say, the standard approach of a tree of fine-grained operators that can be manipulated by the planner and then distributed and run by the execution engine. The main question to me is how we get there.

It's helpful to zoom out and look at the query stack as a whole. Today's query stack has four main pieces.

First: SQL-to-native-query translation, which happens in a Calcite based planner using a collection of Calcite rules that convert logical operators into native queries. They use a "collapsing" strategy, where a tree full of Calcite's fine-grained builtin logical operators is collapsing into monolithic native query nodes one-by-one. (One rule turns a Scan operator into a Druid query; the next rule collapses that Druid query node with a Filter node into a new Druid query node; etc.)

Second: Scatter on the Broker. This is managed mainly by CachingClusteredClient. It scatters the query, by creating subqueries for each data server referencing the specific segments it wants those data servers to process.

Third: Segment fan-out on the data servers. Each data server uses QueryRunnerFactory.createRunner to create a runner for each segment, and QueryRunnerFactory.mergeRunners to create a merged result stream from those runners.

Fourth: Gather on the Broker. This is also managed mainly by CachingClusteredClient. It collects sorted streams of results from each data server, then merges them into a single stream using Query.getResultOrdering (to do an n-way merge of sorted results) and QueryToolChest.mergeResults (to combine rows when appropriate, and do follow-on processing such as projections, having filters, post-aggregation ordering, and limit).

To me the key question is: what here can we keep as-is, what can be evolved, and what should be implemented fresh?

🔥 takes:

FrankChen021 commented 1 year ago
  • We don't have to keep the current request/response structure. If it helps, we can change to a protocol that allows multiplexing of multiple requests over the same connection.

I agree it. For inner connections, we can use high efficient data structure to exchange data (may be in binary). And we even can consider to use TCP instead of HTTP for these connections.

  • Queues can't grow forever. Async handling allows the queue to grow bigger than request-per-thread handling, but there is still a limit, and we still need to have some reasonable behavior around those limits.

If there's such queue limit, I would like to propose a per-user limitation since Druid is a multi-tenant DBMS.

paul-rogers commented 1 year ago

To me the key question is: what here can we keep as-is, what can be evolved, and what should be implemented fresh?

@gianm, thanks for the breakdown of areas! To answer the above question, I'd ask a different one: what are we trying to solve? We only need to change that part.

To answer that, let's do a thought experiment. Suppose we are Pinot, and we use Presto to do non-trivial queries. What would that look like? We'd give to Presto enough information that Presto could distribute segment reads to its workers. Each worker would reach out to a historical with a native query. Presto would try to push as much work as possible into the native query so that Presto receives the minimal amount of data from each historical. That requires work in the Presto planer to work out distribution (which historicals have which segments, how to distribute that work to Presto nodes, and what can be pushed down to the historicals.)

The Presto leaf operators would send native queries to historicals to do the "scan" work. From there, Presto would implement the rest of the query: grouping, joins, windowing, and so on.

Apache Drill (and I think Presto?) have Druid connectors, but the Drill one uses JDBC, which single-threads the reads through the Broker. Would be cool to add the above logic to the Druid connectors for Drill and Presto so we could see how the leading query tools would handle Druid queries. Spark could do the same, but for batch operations.

The Druid solution, of course, will do it all, but we can use the same approach. The planner works out the distribution model, and distributes work to historicals. Rather than having the Broker work with historicals directly, we can have a tier of worker nodes do that work, aggregate or merge results, shuffle to other nodes, etc. We'd do that new work if we want to support queries beyond scatter/gather. (Simple queries reduce to scatter/gather: the broker does the work when it is a simple merge.)

The key point here is that, even with a DAG, ultimately we want to push as much work as possible onto the historical: filtering, per-segment aggregation, etc. We already have native query code that does that. So, expediency says we just create a physical plan that has leaf nodes that hold the per-historical (or per-segment) native query that we then ship to the historical (using REST or some fancy new networking solution).

This means that the part we'd want to change is the distributed portion of the query engine, not the part that runs on data nodes. Since the query runner/sequence structure isn't a good fit for a distributed DAG, we can instead refactor that code into an optimizer and operators, with a distribution framework. Crucially, the data nodes need not change: they don't need to care if they are talking to the "classic" scatter/gather model, Presto, Drill, Spark, or the new multi-tier engine that happens to use an optimizer and operator DAG.

The above was validated in the operator prototype mentioned earlier: one variation refactored the Broker to be operator based, and sent native queries to the historicals as a form of pushing operations into the input source, just like we do in Drill.

paul-rogers commented 1 year ago

We can't drop support for the current monolithic native queries, which means ... we need to be able to convert them to operator trees that can run just-as-well.

Exactly. That's what the operator prototype does: it uses the native query as the "plan" and generates operators for the pipeline operators outside of the parts that work with segments. That is, the merges, limits, offsets, sorts, exchanges, distributions, retries, etc. This proof-of-concept shows that a DAG model works just as well for SQL as for native queries.

The planner could be of the form:

native query  --> native query planner
                                       --> logical plan --> optimizer --> physical plan --> operators
sql           --> sql planner

The above is a pretty typical way that query engines handle this kind of thing. The prototype uses an abbreviated path to keep things simple:

native query --> native query planner --> operators

But, the prototype isn't distributed (yet), so could omit the physical plan stage. The prototype does some optimization (omit a merge if there is only one input, say, or omit steps which don't apply to the query). A "real" optimizer would also do the distribution planning of the kind done in the present ingest MSQE.

paul-rogers commented 1 year ago

My guess is the QueryToolChests and QueryRunnerFactories interfaces also don't get carried through to the operator-tree execution system, but that a lot of the stuff they do internally does get carried through.

Exactly right. This is what was discovered in the prototype. The essence of each operation (sort, merge) is retained. The "packaging" (Sequence, QueryRunner, etc.) is replaced. The "planning" logic in the QueryRunner (choose merge X or Y) is retained.

The QueryToolBox basically provides query-specific planning (that is, it chooses which Sequences to insert.) We'd want to retain that flexibility in the native query planner, but in a different form, a QueryPlannerHelper that does the planning specific to each query type. The output is still a DAG, each query type has influence over what operators are needed, the way that the QueryToolBox today says what Sequences are required.

paul-rogers commented 1 year ago

To me the key question is: what here can we keep as-is, what can be evolved, and what should be implemented fresh?

Here's a crack based on experience with other query engines, and the Druid operator prototype:

Once we have the basic distributed structure, evolved from current code, we are in position to extend it:

Somewhere around here, we then merge the current ingest-oriented MSQE path with the now-operator-based low-latency path:

In short: on the low latency side, the thought is to evolve what we have to put us in a position to incrementally improve those bits we want to improve, and to add pipeline features we want to add. That is, we remodel the house while living in it: refactor, don't rewrite, wherever possible. Recognize, as your note does, that what we have works, but is limited. Refactor the good parts, then add the new stuff. Move from one working system to another, never take it all apart since we'd find that reaching parity with the existing system would be a huge challenge if we try to start over.

paul-rogers commented 1 year ago

The monolithic native queries will not serve us well in a world where queries and dataflow can get more complex.

Just to provide a bit of context for others reading this, the "monolithic native queries" point is key. Today, Druid is a DAG of operations, those operations just tend to be rather large: let's call them "query blocks". We can name three: the native scan query block, the native group-by query block, and a third one we'll call the "broker query block". Every Druid query is a DAG of at least two of these:

broker query block
|
scan query block

Each block does a wide variety of tasks. The scan query block scans segments, pushing filters down though segment selection and segment indexes and bitmaps. The group-by query block does this as well, but also does first-stage aggregation. The broker query block is mostly about merging: it distributes work, merges results, handles missing segments, does sorts, applies offsets and limits, etc.

Since the broker block is used atop both the scan and group by blocks, it must handle operations and data formats specific to each of those blocks. That's what the QueryToolChest does: it says, "hey, when you do a merge, do it such-and-so way".

The SQL-to-native-query translation step that @gianm mentioned maps the fine-grain Calcite relational operations into groups of operations assigned to native or broker query blocks, by way of defining one or more native queries. Since Druid has only scatter/gather, when a query has more than two blocks, the broker handles all but the leaf blocks. A distributed model would push blocks to other nodes. This is, roughly, what the MSQE controller does: it distributes native query blocks, wrapped in frame processors.

We could build a distributed query just using query blocks. Suppose we wanted to do 11 10-way merges instead of a single 100-way merge. We can use two levels of broker blocks:

broker query block (root, 1 of these)
|
broker query block (10 of these)
|
scan query block (100 of these)

Query blocks do "just in time planing." The broker says, "hey, top-level query runner, execute this query." That query runner says, "well, I know how to limit, so I'll do that, and I'll ask the next query runner to run the rest of the query." The next guy knows how to merge but not get data. The next can handle retries, but not distribute. The guy below that knows how to distribute, and so on. This is a very clever and unusual approach: it attaches planning to each "operator". It avoids the need for a big Calcite-like planner. But, it means we can only make decisions based on local information: we can't, say, choose to reorder a join tree based on metadata. If we need global-level planning, we need a planner/optimizer to do the work. (Though, it would be cool if we could keep the local planning where it makes sense.)

So, one model going forward is to keep the existing native query blocks, plan based on those blocks, distribute these blocks. This is, in fact, what Druid does today. However, as @gianm observed, that "will not serve us well in a world where queries and dataflow can get more complex." The reason is that, it becomes increasingly hard to map a Calcite DAG of relational operators to an efficient set of native query blocks: it is a force-fit. The blocks do more than we need (adding overhead) and limit expressiveness. In short, we're trying to execute the classic DAG of relational operators with query blocks designed for a different use case (the original Druid native queries.)

The discussion about operators points out that the native blocks (aside from segment scans) are, in fact, made up of operations implemented as query runners and sequences. We simply disaggregate those blocks into their component operators, wrap that code in a composable form (operators), and then map more directly from the Calcite logical plan to a distributed physical plan made up of operators. We know this works well because dozens of DBs (including Oracle, SqlServer, Teradata, Spark and other industry leaders) work this way.

The key question is: for the goals we want to hit, do we actually need the flexibility of the "standard" approach? Or, can we achieve our goals with less cost and risk by adding a shuffle layer between native query blocks? Is there some optimal middle ground?

paul-rogers commented 1 year ago

A summary of the notes above might be that the multi-stage engine is a great addition. The existing "low-latency" queries are Druid's defining feature. We'd like to drag them both, kicking and screaming, into a standard optimizer/operator DAG structure so we can exploit the abundant industry knowledge about how to optimize queries, and how to implement the Next Favorite Feature. The notes above are mostly about how we do this with low latency queries.

The question we're discussing is whether the current batch multi-stage engine is a good foundation to expand into the low-latency query space. It is great, except we have to replace the code that does exchanges (AKA shuffles: streaming rather than disk based), task management (in-process rather than Overlord based), failure strategy (fail & retry instead of fail & recover), and so on. There are similarities, of course, but the details differ. Low latency queries are optimized for performance, batch queries for durability. Many of the code bits above the basic row operations will differ. (Spark differs from Presto, for example.)

So, we need two execution frameworks, but they should share a common foundation. To evolve the multi-stage engine toward that common foundation, we'd want something like:

The key notion is that if we plan based on operators, we can leverage the vast industry knowledge (of things like relational algebra, how to apply cost estimations, etc.) to improve our planning process -- something which is very hard to do with ad-hoc data structure such as Druid uses for native queries.

The above sketches paths to converge both low-latency and multi-stage batch frameworks toward the industry-standard optimizer/operator-DAG architecture. As we do, we can look for points of synergy. For example both should use frames, should use the same operator implementations, etc. Moreover, effort invested in the planner (metadata, statistics, etc.) benefits both low-latency and batch multi-stage queries.

In short, the answer to the question, "how to we use multi-stage for low latency" is that both frameworks evolve toward the desired end state of the optimizer/operator-DAG architecture. Most other engines are already there, we've just got us some catching up to do.

gianm commented 1 year ago

Patch for frame processors and channels: #12848

gianm commented 1 year ago

Initial work here is released, follow-ups will be in other PRs / issues 🚀