apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.53k stars 3.71k forks source link

[Proposal] Use bitmap iteration for filtered aggregators #6889

Open peferron opened 5 years ago

peferron commented 5 years ago

Problem summary

Filtered aggregators are slower than query root filters because they don't leverage bitmap iteration. This is a problem because:

Problem details

In Druid parlance, filters are partitioned into pre-filters and post-filters. Pre-filters contain conditions that can use bitmap iteration to jump to the next matching row, such as my_dimension = my_value. Post-filters contain conditions that must be evaluated on every row, such as my_metric > my_value.

Query root filters are efficient because they indeed use bitmap iteration to jump to the next row that matches the root pre-filters.

Filtered aggregators are less efficient because they evaluate their pre-filters on every row that matches the root filter.

The result is that this sample query written using a root filter:

{
   "filter": {"type": "selector", "dimension": "my_dim", "value": "my_val"},
   "aggregations": [
       {"type": "count", "name": "my_agg"}
   ]
}

Can be several times faster than the equivalent query written using a filtered aggregator:

{
   "filter": null,
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim", "value": "my_val"},
           "aggregator" : {"type": "count", "name": "my_agg"}
       }
   ]
}

The obvious workaround is to rewrite the second query above into the first before sending it to Druid. But this only works on a subset of queries. For example, the following query cannot be rewritten:

{
   "filter": null,
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_1"}
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_2"}
       }
   ]
}

The next workaround may be to add a unioned root filter:

{
   "filter": {
       "type": "or",
       "fields": [
           {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"}
       ]
   },
   "aggregations": [
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_1", "value": "my_val_1"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_1"}
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim_2", "value": "my_val_2"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg_2"}
       }
   ]
}

But again, this only performs well on the subset of queries where the resulting union has low selectivity. The worst case is if the query contains an unfiltered aggregation:

{
   "filter": null,
   "aggregations": [
       {
           "type": "count",
           "name": "my_unfiltered_agg"
       },
       {
           "type" : "filtered",
           "filter" : {"type": "selector", "dimension": "my_dim", "value": "my_val"},
           "aggregator" : {"type": "count", "name": "my_filtered_agg"}
       }
   ]
}

The next workaround is to split the query into sub-queries that are sent to Druid separately and processed in parallel:

// Sub-query #1
{
   "filter": null,
   "aggregations": [
       {"type": "count", "name": "my_unfiltered_agg"}
   ]
}

// Sub-query #2
{
   "filter": {"type": "selector", "dimension": "my_dim", "value": "my_val"},
   "aggregations": [
       {"type": "count", "name": "my_filtered_agg"}
   ]
}

But this is complicated for users to implement, adds processing overhead, and can introduce inconsistency since the sub-queries may not scan the exact same set of rows especially in a streaming ingestion scenario.

Overall this means there's no good workaround. Please let me know if I missed one.

Potential solutions

To keep things simple, let's assume that all aggregators are filtered and that only pre-filters exist (no post-filters). Let's also ignore any potential optimizations around shared filters (#4267).

Current implementation (for reference): test each aggregator on each row

for row in segment:
  for aggregator in aggregators:
    if aggregator.bitmap.has(row):
      aggregator.aggregate(row)

Pros: simple to implement, which means fewer bugs and easier maintenance.

Cons: as mentioned earlier, the bitmap value is checked for every single row in the segment. This is inefficient, especially for low-selectivity filters.

Solution A: compute each aggregator separately

for aggregator in aggregators:
  for row in aggregator.bitmap:
      aggregator.aggregate(row)

Pros: uses bitmap iteration to skip non-matching rows.

Cons: scans through the data multiple times. Since Druid storage is columnar, this may not be a problem if the filtered aggregators read mutually disjoint sets of columns. But if there's overlap, then data locality suffers, since the same data is decompressed and read multiple times.

Solution B: repeatedly find the aggregator with the lowest next matched row

while not done:
  aggregator, row = find_aggregator_with_lowest_next_matched_row(aggregators)
  aggregator.aggregate(row)

This can be implemented in multiple ways. A typical implementation is via a priority queue holding the next matched row of each aggregator's bitmap.

Pros: uses bitmap iteration to skip non-matching rows, while maintaining single-pass in-order data access.

Cons: increased complexity, and the extra overhead makes it difficult to beat the current implementation in all scenarios.

Early results

I benchmarked a few different implementations of solution B. I'm not linking to the code because I'd rather get feedback about the overall idea first, and the code will get thrown away anyway.

A min-heap implementation yielded the following results on a set of arbitrary test queries (each number is the ratio of avg ns/op compared to unpatched for that test query, so < 1 is faster and > 1 is slower):

1.02, 1.00, 0.11, 0.15, 0.47, 1.22, 1.09, 0.03, 0.45, 0.99, 0.68

The massive speed-ups such as 0.11 and 0.03 happen for test queries that contain low-selectivity filtered aggregators, which makes sense.

Another somewhat weird implementation on top of Roaring yielded less extreme speed-ups, but also smaller regressions:

1.02, 1.02, 0.12, 0.15, 0.48, 1.01, 0.96, 0.09, 0.95, 0.91, 0.79

The last two test queries mimic real queries that we run in production, and get 10-30% speed-ups.

I cut a few corners hacking this in, and I'm not sure what the final performance would look like. I'm especially concerned about extra garbage and memory pressure for some implementations which may not show up in benchmarks. But at least it seems worth looking into.

Next steps

I'm not that familiar with the Druid code base, so is there any obvious solution or blocker that I missed? Otherwise I'd love to get some general feedback about this idea.

Indexes differentiate Druid from its competition—which is usually faster on raw scan speed—so I think it makes sense to try to squeeze as much out of indexes as possible, especially when implementations as good as Roaring are available.

peferron commented 5 years ago

I need to re-evaluate this in the context of the query vectorization proposed in #6794.

Solution B yields good improvements with the current one-row-at-a-time query engine,but will likely not work as well with the vectorized query engine, since even low-selectivity filters are likely to have at least one matching row per batch (the default batch size in #6794 is 512 rows).

On the other hand, it becomes possible to implement solution A on a per-batch basis. Right now, it looks like #6794 does not use bitmap iteration to build the list of matching rows, but checks the value of each row instead (see SingleValueStringVectorValueMatcher), so there might be room for improving filtered aggregator performance there as well.

I'm going to try implementing solution A on top of the vectoreyes (what a pun @gianm) branch of #6794 to see how it goes.

gianm commented 5 years ago

An aside: right now, for query level filters, Druid always uses indexes when possible. But this is not always optimal. Consider a query-level filter like countryName = 'UK' AND comment LIKE '%foo%'. Here countryName is low cardinality and comment is high cardinality (possibly nearly-unique). If "UK" is, let's say, 15% of the total row count of countryName, then it's probably better to resolve this filter by resolving countryName = 'UK' with the index and comment LIKE '%foo%' while scanning. The rationale here is that filters resolved via indexes cannot short-circuit, but filters resolved via scanning can short-circuit. So ideally Druid should potentially move some filters to scan-time and even reorder them based on factors like this.

Coming back to the proposal at hand: @peferron you point out that filtered aggregators, today, never use indexes. By the rationale above, this is probably sometimes better and sometimes worse than using indexes. (And it seems like your benchmarks show the same thing.)

IMO we should be working to make the behavior here consistent: it shouldn't matter whether a filter is query-level or in an aggregator. In either case Druid should resolve them using some logic that tries to take advantage of the factors mentioned above.

But, inevitably, it won't always make the best decision. So IMO I also think we should have some way of 'hinting' Druid to use, or not use, the index for a particular filter.

Putting these two suggestions together: how about we build the hinting system first, which should let users that "know better" tell Druid what to do, and then build the automatic decisionmaking system afterwards? (With the goal that most users shouldn't have to provide hints.)

Indexes differentiate Druid from its competition—which is usually faster on raw scan speed—so I think it makes sense to try to squeeze as much out of indexes as possible, especially when implementations as good as Roaring are available.

Hopefully the gap narrows, closes, or even inverts with vectorization (#6794) and adaptive compression (#6016).

clintropolis commented 5 years ago

Another thing to consider is that using bitmap indexes isn't always better - see #3878 and related issues, which describes the case where very high cardinality dimensions can negatively impact query speed when using bitmap indexes first.

I'm currently working on a solution to this problem, I still have some experiments to do, but we should definitely try to coordinate so that the system I propose for deciding to evaluate a filter as a pre or post filter would also be usable for aggregator filters.

clintropolis commented 5 years ago

Oof, @gianm beat me to a response, there is some overlap between our comments 😜

gianm commented 5 years ago

On the other hand, it becomes possible to implement solution A on a per-batch basis. Right now, it looks like #6794 does not use bitmap iteration to build the list of matching rows, but checks the value of each row instead (see SingleValueStringVectorValueMatcher), so there might be room for improving filtered aggregator performance there as well.

Doing solution A on a per-batch basis sounds like a good way to go with vectorized filtered aggregators.

peferron commented 5 years ago

To sum up:

  1. The current implementation always uses indexes for query filters, and never uses indexes for aggregator filters.
  2. Always using indexes for both may be better overall, but would perform worse for some queries.
  3. We can't force regressions on Druid users, even on a small subset of queries (correct me if I'm wrong here, but given Druid's maturity and production use, that seems reasonable).
  4. This means we need to let users override which strategy to use, so they can at least maintain the previous performance.
  5. Ideally, we would also have an "auto" strategy, which @clintropolis is working on; but we still need the manual override unless that strategy is always right (see point 3).

Is that correct?

peferron commented 5 years ago

@gianm: I experimented with removing VectorValueMatcher from FilteredVectorAggregator and replacing it with a VectorOffset based on the aggregator filter.

Speed-up is about 10% on SQL query 7, which is better than nothing but not groundbreaking.

I believe that the speed-up is small because the VectorValueSelector still reads column values for the entire batch, even if the aggregator only needs 1 row. I'm referring to this line in LongSumVectorAggregator:

final long[] vector = selector.getLongVector();

If I understand correctly, depending on compression, encoding and so on, it may be faster to read only a small subset of rows from the underlying columnar storage, rather than the full batch. For example, reading the entire batch may involve decompressing an extra block that doesn't even contain any of the rows that the aggregator needs. But the current design of VectorValueSelector doesn't allow aggregators to take advantage of that finer-grained access. Do you think that we should consider amending the vectorization design to enable that?

BTW, let me know if you'd like to take this discussion to #6794 instead, since it's starting to sound like a more general discussion of the vectorization system.

gianm commented 5 years ago

To sum up:

  1. The current implementation always uses indexes for query filters, and never uses indexes for aggregator filters.
  2. Always using indexes for both may be better overall, but would perform worse for some queries.
  3. We can't force regressions on Druid users, even on a small subset of queries (correct me if I'm wrong here, but given Druid's maturity and production use, that seems reasonable).
  4. This means we need to let users override which strategy to use, so they can at least maintain the previous performance.
  5. Ideally, we would also have an "auto" strategy, which @clintropolis is working on; but we still need the manual override unless that strategy is always right (see point 3).

Is that correct?

1 & 2 are definitely correct. 3 & 4 are ideals we try to hold ourselves to whenever possible. Sometimes offering options to preserve old behavior isn't feasible for some reason -- maybe the old and new behavior cannot both exist due to a fundamental incompatibility. How to handle that kind of thing tends to be case by case. 5 accurately reflects my opinion at least :)

If I understand correctly, depending on compression, encoding and so on, it may be faster to read only a small subset of rows from the underlying columnar storage, rather than the full batch. For example, reading the entire batch may involve decompressing an extra block that doesn't even contain any of the rows that the aggregator needs. But the current design of VectorValueSelector doesn't allow aggregators to take advantage of that finer-grained access. Do you think that we should consider amending the vectorization design to enable that?

That's true. In general it's best if we only read what we need. One tricky thing here happens with a query like this:

SELECT
  SUM(x) FILTER(WHERE y = 1) as agg1
  SUM(x) FILTER(WHERE y = 2) as agg2
FROM tbl

If y = 1 OR y = 2 accounts for a high proportion of rows, then the current approach of #6974 works well. It avoids wasting time building separate partial vectors of x for each aggregator. But if y = 1 OR y = 2 accounts for a small proportion of rows, it doesn't work as well. You'd probably prefer to either generate separate masked x vectors for each aggregator, or somehow OR together all the filters that apply to things reading from x and then use that when reading the x column. Either approach would definitely need to involve some rework of the APIs.

BTW, let me know if you'd like to take this discussion to #6794 instead, since it's starting to sound like a more general discussion of the vectorization system.

Yeah, I think so -- if you reply to vectorization-specific stuff then let's do it over there. (But keep the discussion about harmonizing filtered aggregators & query-level filters, here, I think, assuming they don't become too intertwined.)

peferron commented 5 years ago

I'm going to put this on hold until vectorization (#6794) is merged or at least close to its final form., since it's quite clear from this discussion that an alternative index-based strategy for filtered aggregators would be implemented very differently with or without vectorization.

The configuration system to let users pick which strategy to use could be implemented now for query filters, though. I'm going to think a bit about that and open a new issue/proposal if needed.

github-actions[bot] commented 1 year ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.