apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.46k stars 3.7k forks source link

[Proposal] Simple join support #4032

Closed jihoonson closed 5 years ago

jihoonson commented 7 years ago

Motivation

Join is a very popular and useful query type for OLAP. Druid currently doesn't support join due to performance and complexity issues. However, one of very popular join patterns in OLAP is a star join, i.e., joining a large fact table with many small dimension tables. Supporting this kind of workload with reasonable performance will be greatly useful for druid users.

Proposed Join implementation

In this proposal, I'm focusing on the star join. In addition, inner equi join is only considered here.

Join query

Unlike Union, Join is regarded as a query type rather than a data source. Since a query can be used as a data source in Druid, join can be used as both a general query type and a data source.

A join query contains a JoinSpec which represents a join relationship between two sources. It can have another JoinSpec as its child to represent joins of multiple data sources. Unlike other query types, the join query's data sources are specified in JoinSpec. The predicate in a JoinSpec can include logical operators and arithmetic operators. dimensions and metrics specify output column names.

Here is an example.

{
  "queryType": "join",
  "joinSpec" : {
    "type" : "INNER",
    "predicate" : {
      "type" : "equal",
      "left" : {
        "dataSource" : "foo",
        "dimension" : { "type" : "default", "dimension" : "dim1" }
      },
      "right" : {
        "dataSource" : "bar",
        "dimension" : { "type" : "default", "dimension" : "dim1" }
      }
    },
    "left" : {
      "type" : "dataSource",
      "dataSource" : "bar"
    },
    "right" : {
      "type" : "joinSource",
      "joinSpec" : {
        "type" : "INNER",
        "predicate" : {
          "type" : "and",
          "left" : {
            "predicate" : {
              "type" : "equal",
              "left" : {
                "dataSource" : "foo",
                "dimension" : { "type" : "default", "dimension" : "dim2" }
              },
              "right" : {
                "dataSource" : "baz",
                "dimension" : { "type" : "default", "dimension" : "dim1" }
              }
            }
          },
          "right" : {
            "predicate" : {
              "type" : "equal",
              "left" : {
                "dataSource" : "baz",
                "dimension" : { "type" : "default", "dimension" : "dim2" }
              },
              "right" : {
                "predicate" : {
                  "type" : "add",
                  "left" : {
                    "dataSource" : "foo",
                    "dimension" : { "type" : "default", "dimension" : "dim3" }
                  },
                  "right" : {
                    "dataSource" : "foo",
                    "dimension" : { "type" : "default", "dimension" : "dim4" }
                  }
                }
              }
            }
          }
        },
        "left" : {
          "type" : "dataSource",
          "dataSource" : "foo"
        },
        "right" : {
          "type" : "dataSource",
          "dataSource" : "baz"
        }
      }
    }
  },
  "dimensions" : [ "foo.dim1", "foo.dim2", "bar.dim5", "baz.dim5"],
  "metrics" : [ "foo.met1", "bar.met2" ],
  "filter": { "type": "selector", "dimension": "baz.dim1", "value": "some" },
  "granularity" : ...,
  "intervals" : ...,
  "virtualColumns" : ...,
  "context" : ...
}

This query can be expressed in sql like

SELECT
  foo.dim1, foo.dim2, bar.dim5, baz.dim5, foo.met1, bar.met2
FROM
  bar, foo, baz
WHERE
  foo.dim1 = bar.dim1 AND 
  foo.dim2 = baz.dim1 AND
  baz.dim2 = foo.dim3 + foo.dim4

or

SELECT 
  foo.dim1, foo.dim2, bar.dim5, baz.dim5, foo.met1, bar.met2
FROM
  bar INNER JOIN foo ON foo.dim1 = bar.dim1
  INNER JOIN baz ON foo.dim2 = baz.dim1 AND baz.dim2 = foo.dim3 + foo.dim4

.

Design

Among many well-known distributed join algorithms, broadcast join is known as an efficient algorithm when joining a small table with a large table. In broadcast join, the small table is replicated to all processing nodes. Thus, those processing nodes can perform join without further data exchange.

Broadcasting data

In Druid, data can be broadcasted at data ingestion time using load rules. To do so, the following changes need to be made.

Join query processing

A join query is processed as follows.

Hybrid hash join

Hash join is a simple and efficient solution for equi join. When joining broadcasted data sources, the size of join result can be large even though inputs are small data sources. Thus, the hybrid hash join is desired.

Runtime join ordering

Join ordering is significant for efficient join processing. When historicals and realtimes join broadcasted segments, they first choose the cheapest join of two segments and join them. And then, again, they choose the cheapest one among joins of segments and the previous join result. They repeat this until all segments are joined.

GroupBy after Join

One of very popular query patterns is groupBy after join. This can be expressed by specifying a join as a query data source of a groupBy in Druid. Once this kind of query is submitted, data nodes (historicals and realtimes) first process join and then immediately perform the groupBy against the join result. Finally, brokers collect bySegment groupBy results.

Join vs QTL

Overally, join is more flexible and well-integrated with existing Druid's components than QTL.

Features Join QTL
Available join types Inner join, Outer join, Theta join Left join, Equi join
Available join algorithms Hash join, sort-merge join, ... Hash join
Complex join predicate support Support Not support
Need specialized mechanism No Yes
Use normal Druid source Yes No

Here, t1.col1 + t2.col1 = t1.col2 + t2.col2 can be an example of complex join predicate.

Future plan

drcrallen commented 7 years ago

Cool! I haven't been able to give the plan a full review but I wanted to call out that having join functionality separate from QTL is the correct way to go. QTL was originally conceived as a high-performance, data-enrichment feature; not a join substitute. So hopefully it stays around and this can add some awesome functionality.

gianm commented 7 years ago

Nice proposal @jihoonson!

About broadcasting data,

If all data sources are broadcasted except one, join can be performed in multiple nodes holding the segments of the non-broadcasted data source.

What should we do when the broadcast dataSource hasn't been fully broadcast yet? For example, a new historical node comes online and hasn't loaded all of the broadcast dataSources yet. Or, a new broadcast dataSource is added, and it hasn't been loaded on all historical nodes yet. I can think of a few ways to deal with it.

  1. Any nodes that don't have the broadcast table yet, treat it as if it was empty. This will end up generating a bunch of nulls that shouldn't be nulls.
  2. Fail the query, the user should retry later.
  3. Wait for the table to finish broadcasting and then do the query.

There's a related case, where a broadcast dataSource is updated (new segment versions) and the update hasn't fully propagated yet. In this case, I guess we have two options.

  1. Some nodes use the old version and some use the new one
  2. All nodes use the old version until the new one is fully propagated (requires some extra coordination)

Once this option is set for a data source, all segments of the data source are replicated to all historicals and realtimes.

I guess for this, we'll need a way for the coordinator to assign segments to realtime tasks. Maybe they could all run ZkCoordinators like historicals do?

Optionally it could also be nice for the broadcast rule to say "table X should be broadcast to any node that also hosts table Y" -- so if you have a table X that you never need to join to table Z, you don't need to broadcast it to realtime tasks for dataSource Z. Realtime tasks tend to have fewer resources available than historical nodes, so this would help them out a bit by potentially limiting the number of tables they need to load.

himanshug commented 7 years ago

we have been pretty successful creating extensions with custom query types that do specific things similar to join but not wholesome general joins...

basic idea has always been to create a custom query type, on receiving the query that implementation would use DirectDruidClient to send 1 or more queries to historicals to gather data... combine those results (sometimes in-memory if streaming kind of merge not possible) and then return the results to user. so, that extension is only known to broker.

above strategy has been used to implement user retention, rolling averages type of queries, implementations are generally specific to our usecase but idea is general.

gianm commented 7 years ago

basic idea has always been to create a custom query type, on receiving the query that implementation would use DirectDruidClient to send 1 or more queries to historicals to gather data... combine those results (sometimes in-memory if streaming kind of merge not possible) and then return the results to user. so, that extension is only known to broker.

I think it would be cool to optimize a join plan to something like this in some cases (like QTL does when "injective" is true) but it makes sense to build out the regular path first that involves historical nodes.

gianm commented 7 years ago

@jihoonson, under "Future plan":

Join result (or hash table) caching

Does this mean caching the hash table of join key -> table row, for equi-joins? If so, maybe join performance could approach QTL performance, since that's similar to the data structure that QTLs use.

jihoonson commented 7 years ago

@himanshug thanks for your comment. I believe join will share mostly same path with other query types for processing as you said. I expect that the only change in historicals will be that they need to find segments for two or more data sources for join (in ServerManager).

jihoonson commented 7 years ago

@gianm thanks for your comment and nice suggestion.

For the case of when a query is submitted but broadcasting data (or updating broadcasted data) is not finished yet, I think the solution 2 for both cases would be good. A data source is not loaded until the data broadcast is not finished, so the query cannot be executed. Similarly, a data source is not updated until the update for all broadcasted data is not finished, so the query should be executed with data of the old version. I think that we can expect consistent behavior with this solution, thereby the implementation can share the same processing mechanism for both cases. For users, a new api which provides the status of broadcasting (and updating) data source may be needed.

I guess for this, we'll need a way for the coordinator to assign segments to realtime tasks. Maybe they could all run ZkCoordinators like historicals do?

Yes, it can be implemented like historicals do.

Optionally it could also be nice for the broadcast rule to say "table X should be broadcast to any node that also hosts table Y"

I think it's really valuable. I'll consider it when I add full replication option to the load rule. Also do I need to update the proposal?

jihoonson commented 7 years ago

@gianm

Does this mean caching the hash table of join key -> table row, for equi-joins? If so, maybe join performance could approach QTL performance, since that's similar to the data structure that QTLs use.

It means caching the join key -> hash table only containing the build part, or join key -> final join result. It seems to be similar to QTL for the former case, I'm not sure which one is more beneficial.

jihoonson commented 7 years ago

@drcrallen thanks! I also think that QTL has its own goal different from join and has more rooms to achieve that goal.

weijietong commented 7 years ago

@jihoonson glad to see your join query idea , and coincidently , I am also woking on implementing a join method which I call it LocalJoin. It has the feature that a join query does not need to be broadcasted, but partitioned on join keys of different data sources. and I have done lots of the work . I will take another thread to talk about that .Please see #4040 , but I think we have something common to agree upon , e.g. the join query json spec and something others.

jihoonson commented 7 years ago

Hi @weijietong, I read your proposal, and sounds great! Your proposal is about joining tables partitioned by same key, and actually it is one of the future works listed above. I'm really glad that you are already working on it. Yeah, I also think that we will have some issues we need to agree. Here is my hangout address: ghoonson.

jihoonson commented 7 years ago

@weijietong specifying intervals and granularities per DataSource will be really useful. Nice idea!

weijietong commented 7 years ago

@jihoonson my hangout address is tongweijie178@gmail.com . For more details we can talk to each other on that.

weijietong commented 7 years ago

@jihoonson I think you don't need to implement the HashJoin for broadcasted data. You can utilize the BitmapMatrix technology of my LocalJoin implementation. HashJoin will take more memory space by holding the joint field value and materializing all the matched join result. More details about BitmapMatrix see my proposal ,maybe I will submit that part of code first.

jihoonson commented 7 years ago

@weijietong I looked over the paper you mentioned in your proposal, and I think implementing HashJoin is still valuable because join predicate can have arithmetic operators as described above which is difficult with bitmap indexes. In addition, I'm not sure but HashJoin wil be faster than CP, the proposed method in the paper, for equi join. Finally, I think having several join strategies will be beneficial. There is no one size fits all. We can choose one dynamically which is the best one for the given query and execution environment.

jihoonson commented 7 years ago

I discussed with @gianm offilne, and found that load rules are not proper for data broadcast because load rules are currently used for replication which is usually for retaining data. Broadcasting data is an extreme case of replication, and I think it will be used only for join. So, if load rules are used for broadcasting data, they are used for two different goals, retaining data and preparing data for join.

To avoid this, @gianm proposed a new rule called DistributionRule. This new rule controls the data distribution by specifying like "these two tables should be stored together". In this case, a smaller one is broadcasted to all nodes holding the segments of the larger one. Also, this rule can be extended to include partitions like "these two tables should be stored together by partition on a key". This rule can be applied to only partitioned data sources on the same key. It means, the distribution of a data source can be changed by applying DistributionRule, but its configuration which is specified at ingestion time (like partitionSpec) cannot be changed.

I think this will be valueable because it is less confused than using load rules and extensible for partitions. @weijietong, I also think you can use this feature later. What do you think?

yurmix commented 5 years ago

@jihoonson This sounds like a very useful feature. I think this can be relevant to one of my lookup use-cases (multi-value lookups). In retrospective, do you still consider it a sound technical solution? Would it be a good work to continue upon?

jihoonson commented 5 years ago

Hi @yurmix, I think this proposal is still worth and someday I would like to continue my work. But, I don't have a slot available for this for now. Please feel free to take it if you want.

gianm commented 5 years ago

@yurmix for multi-value lookups, would you be thinking about extending the lookup framework to support multiple values for the same key? Or avoiding the lookup framework entirely?

By the way, I've been thinking recently about reviving proposals for joins in Druid and had been considering a few goals for a solution, below.

yurmix commented 5 years ago

@yurmix for multi-value lookups, would you be thinking about extending the lookup framework to support multiple values for the same key? Or avoiding the lookup framework entirely?

I was thinking of replacing lookups with joins for cases where multi-valued lookups are necessary. This means that the dimension key is single-valued and the dimension value is multi-valued. But after some thought, I don't think joins of multi-valued dimensions can be supported. A multi-valued attribute is not part of the relational model. It is a NoSQL concept. And in Druid, it is implemented as part of the data storage layer (segment), which decodes/encodes it in processing and query time.

Other thoughts:

stale[bot] commented 5 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 5 years ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.