apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.39k stars 3.68k forks source link

Online aggregation / Interactive Query Session Protocol #7087

Open leventov opened 5 years ago

leventov commented 5 years ago

This issue discusses two proposals: "Online aggregation" and "Interactive Query Session Protocol" because they are very closely related to each other.

Online aggregation

Previously the "online aggregation" concept in the context of Druid was proposed as "incremental query results", but to avoid a clash with "incremental" term as used in IncrementalIndexes, and because it seems that "online aggregation" term is most often used to refer to this concept in scientific papers and elsewhere (possibly beginning from the Online aggregation paper by JM Hellerstein et al, 1997) it's suggested to stick to "online aggregation" from now on.

To facilitate human interactions with analytics and data exploration interfaces backed by Druid, and also to reduce percieved Druid's error rates online aggregation must be implemented on the level of Druid's Broker nodes. See The case for interactive data exploration accelerators (IDEAs) by A Crotty et el, 2016, and related research for some data and ideas related to this topic.

To help readers better understand online aggregation, I first describe "offline" aggregation, i. e. what is already implemented in Druid:

"Offline" (default) aggregation

Simple HTTP request-response between a Druid's Broker node and a client. (Possibly proxied through a Router node).

Broker determines all segments needed to compute results for the query and historical nodes on which those segments are loaded (except for segments for which results of the current query are found in cache). Broker sends requests to all those historicals to execute the query over all those segments. When historicals send the results back, Broker aggregates them all and send results back to the client (or a Router, if the request is proxied through it). This logic is implemented mainly in CachingClusteredClient class.

Online aggregation

Works on top of HTTP streaming or WebSockets connection between a Druid's Broker node and a client (possibly proxied through a Router node).

Broker determines all segments needed to compute results for the query and historicals on which those segments are loaded, but may not send requests to historicals to execute the query over all those segments right away. Instead, if, for example, there are 10 partitions in all intervals covered by the query, broker sends requests to historical to execute the query over 3 segments in each interval (not necessarity with partition numbers 0, 1, and 2; they may be chosen randomly for each interval in separation). As soon as results for at least two segments in each interval arrive to Broker, Broker sends requests to historicals to execute the query over the next portion of segments in each interval. Broker aggregates incoming segment query results from historicals in background. From time to time Broker sends progressively more complete aggregation results (partial aggregation results) back to the client.

Fault tolerance: if some historical failed to return results for some segment (or timed out), Broker tries to send this request to another historical node on which the segment is loaded. It may also avoid sending more requests to that historical node for other segments in the course of this online aggregation session.

The reasoning and the principle behind the throttling of broker -> historical requests: a user interacting with the analytical interface may quickly narrow down the query or start executing another, entirely different query, not interested in the results of the current query anymore (by closing the browser tab, or going to "Home" in the interface). In this case an attempt to compute the most precise results with the maximum possible speed (bound by the throughput of the Druid cluster) will turn to be a waste of resources.

In the example above, Broker sends requests for 3 segments in each interval out of 10. A possible general principle is to not having more than 1 or 2 outstanding segment query executions on any historical at any time. This is because if historicals use HDDs or network-attached disks, and the segments don't appear in the page cache (reminder: they are all memory-mapped, but there is an idea to move away from that) more segment query executions will likely anyway queue up inside the historical, contending for I/O. So, that "1 or 2" (or more? configurable?) may depend on the recency of the interval, because more recent data is more likely to appear in the page cache.

Results format: along with ordinary aggregation results (on each iteration), Broker sends to the client the information of over what fraction of total data the current results are aggregated. It may also send estimated error / confidence intervals of the partial aggregation, if it is able to compute them for the given aggregation function, and if the user opts to recieve such data.

Broker also sends a flag indicating that the partial aggregation results may be skewed, e. g. if the partial aggragation contains no data from some interval(s) covered by the query (because of availability problems, slowness, errors, etc) or if there are disproportionate amounts of rows from different intervals it's not possible to adjust them for the aggregation function(s) that we compute, or if rows are partitined between segments within intervals based on some key rather than randomly.

When (how often) broker sends partial results back to the client: it may be a combination of the following factors:

Aggregation: for simple Sum and Count aggregators, interpolation of partial results to projected final values is pretty trivial, if historicals send the numbers of rows in segments and (for sum aggregators) the numbers of aggregated rows (i. e. Count) along with segment results themselves. For some other aggregators approximation of final results by partial results is non-trivial and may require using tools such as sketches (FYI @leerho, @edgan8) in Broker's memory, although neither the data nor the aggregation requested by the user has anything to do with sketches. This may be futher complicated by the requirement to estimate errors / condifence intervals of the partial aggregation results (see above).

However, even if during online aggregation approximation / projection of final results by partial results is not done at all and partial aggregation results are returned to the client as if it was all the data that needs to be aggregated, this is still much better than offline aggregation and in fact may not be noticeably worse than a sophisticated approximation algorithm from the user's perspective, because users may only be interested in the relative weights of different rows topN and groupBy results or a general trend in timeseries results, not the absolute values. Many visualisations even don't show users absolute values and let them only percieve relative weights (pie charts, bar charts).

JSON vs SQL query: it would be nice if online aggregation was agnostic to the query format and work with JSON as well as SQL queries.

Interactive Query Session Protocol

Interactive Query Session Protocol works on top of online aggregation. It doesn't seem that it provides much benefit without online aggregation implemented.

It may be implemented as an actual protocol on top of a single WebSocket connection, or just be a loose agreement about the expectations between clients and Druid nodes (Brokers, Routers), although each query is a separate HTTP Streaming / WebSocket interaction.

The idea is that human interaction with an analytics interface often appears to be a series of "drilling-down" queries: each following query is the previous query + more dimension filters. As soon as user starts the next query, he stops being interested in the previous query to be computed further, but it's likely that a user will return to that less selective query soon. There are also "sibling" query transitions on the same level of selectivity: e. g. a user moves from query with a filter dim1 = value1 to an otherwise identical query with a filter dim1 = value2, or dim1 IN {value1, value2}, or dim1 != value1.

Interactive Query Session Protocol (IQSP)

All queries within the human interaction session should end up on the same Broker. Unless IQSP is implemented as an actual protocol on top of WebSocket, user (analytics interface) need to make sure to query the same Broker within the session, or a Router node needs to identify a session heuristically or with help of some extra headers in requests and route all queries within a session to the same Broker.

As soon as the next query in the session arrives, broker stops online aggregation of the previous query, but doesn't drop the partial results and the query execution state until the end of the session. If a user returns to this query later in the session, broker will resume its execution from where it stopped.

After certain "sibling" transitions, broker may be able to provide some results to the client before querying any historical nodes, by manipulating the cached results of previous less selective queries and the previous "sibling" query. E. g. dim1 = value1 -> dim1 != value1 transition, and dim1 = value1 -> dim1 IN {value1, value2}, if dim1 = value2 already appeared earlier in the session.

jihoonson commented 5 years ago

@leventov I'm not sure what problem you're trying to solve with online aggregation. Is it the problem of "heavy queries blocking light queries"? For this problem, I think https://github.com/apache/incubator-druid/issues/6993 is a better approach because it also considers what query is important. Sometimes, a heavy query is more important than others and should be finished quickly.

Also, as far as I remember, the online aggregation defines query interface as well as query processing method. The query interface of online aggregation shows the query result with an error rate before the entire query processing completes. As query processing progresses, the query result becomes more accurate.

I'm not sure where this interface is appropriate for. Usually people already know what error is allowed and want for their queries to be finished as soon as possible in the error bound. Do you have any particular example of this use case?

Finally, the query result accuracy is highly related to the input sampling. Do you have an idea of how good the accuracy is with the random segment selection?

jihoonson commented 5 years ago

Also, I think we have agreed on the proposal template. Would you please update the proposal based on the template?

leventov commented 5 years ago

Is it the problem of "heavy queries blocking light queries"?

No, that's an orthogonal issue. Online aggregation addresses two big things:

Also, as far as I remember, the online aggregation defines query interface as well as query processing method. The query interface of online aggregation shows the query result with an error rate before the entire query processing completes. As query processing progresses, the query result becomes more accurate.

I'm not sure where this interface is appropriate for. Usually people already know what error is allowed and want for their queries to be finished as soon as possible in the error bound. Do you have any particular example of this use case?

I don't see how the sentence "Usually people already know what error is allowed and want for their queries to be finished as soon as possible in the error bound." is connected with the rest of this part of your message. But I think that usually people don't know and/or don't care about error bounds. People want to see something as soon as possible. For navigational topN queries in the interface, for example, they often care only about top1 to filter on it, or filter on some value that they already have in mind (e. g. "country = US"), so they merely need to start to see dimension values to be able to click on one of them, not caring about the topN values themselves at all.

I'm not sure where this interface is appropriate for.

I think all interfaces such as Superset, Imply's interface and Metamarkets's Exlore can adapt to online aggregation to improve user experience.

jihoonson commented 5 years ago

But I think that usually people don't know and/or don't care about error bounds. People want to see something as soon as possible.

@leventov thanks. I think this is the argument. I think people should know it. For an extreme example, no one wants the result of 1% of accuracy. They might not know the exact allowed error bound, but at least they should know a rough one. Then, I think updating the query result progressively is not necessary because, as you said, they want to see the query result as soon as possible. Once the query result shows up and the error bound is allowed, they will just want to do something else rather than waiting for the query result to be more accurate. If this makes sense, the query can be processed by just sampling some segments and returning an approximate result.

I think this is why the online aggregation hasn't been implemented in any production systems even though it's been almost 20 years since it was introduced.

gianm commented 5 years ago

I think this is why the online aggregation hasn't been implemented in any production systems even though it's been almost 20 years since it was introduced.

I think not quite true. Splunk does something like this. I couldn't find documentation on it, but I think the feature is called a "preview". It can show time charts and tables that sort of bounce around until they're done being computed, and at that point a checkmark is shown. I found it useful for search results, but annoying for aggregations (since the previewed results can easily be misleading, like a time chart in a totally different shape from how it will end up being).

gianm commented 5 years ago

In the context of how Druid is usually used, I could see this functionality being useful for topNs that are used for autocomplete or quick-filter widgets (they are sort-of like searches and partial searches can be useful). But I would shy away from it for most visualizations, for the reason above (it can easily be misleading).

jihoonson commented 5 years ago

@gianm thanks. Good to know Splunk has a similar one.

leventov commented 5 years ago

An additional idea here (from this paper by Eric Brewer, "Graceful degradation" section) is that as soon as there are first results ready for a query on a Broker, it can allow the next query to start in parallel. If the current limit of parallel queries on a Broker is X, we can make the limit of "parallel already partially computed queries" 3X or 5X, for example. This may help in case of Broker's overload so that it returns something useful to more clients faster (but they may wait for longer for complete results). This will also require different prioritization of per-segment queries on Historicals.

The effectiveness of running some NCPU * 5 processing threads may be reduced through unless we move to async data processing (at least in the part of the system where Broker aggregates partial results from Historicals).

drcrallen commented 4 years ago

I think the interval chunking query runner is a partial solution for this. Since the json result is streamed back, with interval chunking some of this is implemented for time-series-like queries. But it would certainly be helpful for many interactive query tools and UIs.

leventov commented 3 years ago

In Big Data Visualization and Analytics: Future Research And Emerging Applications (a review from SIGMOD 2020), two scientists, Jean-Daniel Fekete and Tim Kraska, explicitly mentioned progressive analytics.

github-actions[bot] commented 1 year ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.