Open jihoonson opened 4 years ago
Hi @jihoonson, thanks for your great ideas and plans to move forward from inline datasources joining.
The coordinator should assign all segments of a broadcast datasource to all historicals, realtime tasks, brokers, and indexers.
Can we have a fine-grained segment assignment/loading plan with the help of HashedPartitionsSpec
?
As we know Druid already enables hash-based secondary partitioning for batch indexing tasks.
Suppose that there are several datasources which are configured with same secondary hash-based partitionsSpec
which contains partitionDimensions
user intends to joining. Then we introduce a new LoadRule, like ColocateDistributionRule
which colocates segments with same chunk id, to load segments among query processing services. And to work with realtime data, Druid has to enable HashedPartitionsSpec
in realtime ingestion tasks while currently the historical segments with hashed
ShardSpec
seems not appendable by default. But still could it be a potential improvement that is compatible with this proposal?
Hey @yuanlihan, that is a great idea! We should definitely support such a partition-aware segment loading in the future. I'm not sure about details of the new segment loading algorithm yet, but it could be compatible with this proposal.
Motivation
Druid now supports real joins (#8728). Even though this introduced a lot of opportunities to expand the capability of Druid, it is in practice pretty limited yet since we support only joins of a Druid datasource to Lookups or inline datasources. This proposal is to support the broadcast join using the broadcast rule (https://github.com/apache/druid/pull/4077). The goals of the proposed design are:
Proposed changes
The idea is pre-broadcasting a datasource in every node and using it at query time. To do so, a datasource can be created using the regular ingestion spec and indexing service, and then be broadcasted by setting a
BroadcastDistributionRule
via the coordinator API. The entire datasource (its all segments) will be broadcasted all together to every historical, task (or indexer), and broker. A broadcasted datasource can be switched into a regular datasource by setting aloadRule
via the coordinator API. The coordinator will be responsible for broadcasting datasources similar to that it distributes regular datasources based onloadRule
.The
BroadcastDistributionRule
can be set for any datasource. However, for faster hash join, we can create a pre-built hash table at ingestion time and load it in query nodes. At query time, the join query engine will check whether there is a pre-built hash table or not. If it's there, the engine can use it. Otherwise, it can create one on the fly.The below subsections desrcibe what should be done to implement the broadcast join with the pre-built hash table.
Pre-built hash table
The pre-built hash table is created by the index task. Since the broadcasted datasource can be large, the query node (historicals, tasks, indexers, and brokers) should be able to read them using mmap as for regular segments to avoid heavy GC overhead.
The format of the hash table is not determined yet. I will update this proposal or write another proposal for it. Whatever the new format would be, a new
SegmentizerFactory
will be added to handle it properly.The index task will create pre-built hash tables
To let the task know what kind of segments/indexes it needs to create, we need a new configuration in the ingestion spec. However, I'm not 100% sure what a nice configuration would be for it yet. One of the reasons is the undetermined hash table format. I think the new configuration might be different depending on what we want to do with the hash table format. For example, we need at least the list of columns to build the hash table which should be specified via the user configuration. I'm not sure yet what else we need.
As a result, I would like to suggest to add
SegmentizerFactory
in the tuningConfig as a meantime solution. The segmentizerFactory is already an extendable interface and JSON-serializable. By doing this, we can test out various segment/index format for hash join. After the dust settles down and we find the best format, we can deprecate theSegmentizerFactory
and add a better configuration in the ingestion spec. An example would beIf the
SegmentizerFactory
is missing in the ingestion spec, theMMappedQueryableSegmentizerFactory
will be used by default. The index task will write theSegmentizerFactory
in thefactory.json
file when it writes the segment file. See the below "SegmentizerFactory for loading data and pre-built hash table" section for how to read the segment/hashtable file.For broadcast datasources, it is not allowed to append more data. That means, the whole datasource should be broadcasted all together.
Modifying BroadcastDistributionRule
The
BroadcastDistributionRule
has a field ofcolocatedDatasources
. This field was for users to specify with what datasources the broadcasted datasource will be colocated. This causes a bug in broadcasting, that is, the broadcasted segment should be moved with the colocated segment atomically which it doesn't do it now. To fix this issue, I think it's better to remove this field because a regular datasource is usually distributed to all historicals if it's large enough and so thecolocatedDatasources
doesn't seem very useful anyway. Instead, we can add tiers similar toloadRule
s.I believe this doesn't cause any compatibility issue as it is not practically in use in any production. I'm pretty sure about it because 1) we didn't have a proper use case for it before, i.e. joins, and 2) it is only partially implemented and doesn't fully work.
Assigning and balancing segments
The coordinator should assign all segments of a broadcast datasource to all historicals, realtime tasks, brokers, and indexers. When assigning segments, replication throttling shouldn't apply for broadcasting. Also, the coordinator should skip balancing for broadcasted datasources.
Segment loading in brokers, realtime tasks, and indexers
All brokers, realtime tasks, and indexers will load assigned segments. Both ZK-based and HTTP-based segment loading should be supported.
SegmentizerFactory for loading data and pre-built hash table
Once a segment file is downloaded in a historical, a task, an indexer, or a broker, the file should be loaded properly including the pre-built hash table. The node should create the right type of
SegmentizerFactory
from thefactory.json
file created by the index task. The newSegmentizerFactory
should know details of how to load data and the hash table. Once theSegmentizerFactory
finishes loading, the node announces the loaded segment.Querying broadcasted datasources
All broadcast datasources will be registered on the
index
schema. When you want to join using SQL, you can do like below:For joins to broadcast datasources, the broker will find the most recent broadcast segments which are available in all nodes where it wants to send the query. If there is no such broadcast segment, the query will fail. Once it finds such broadcast segments, it will send the join query to all nodes which have the broadcast segments and collect the result from them. Note that the join of a broadcast datasource and another broadcast datasource will not be supported in this proposal.
For directly querying on the broadcast datasources, the broker should pick up only one node among historicals and brokers including itself and send the query to it.
Rationale
Operational impact
As explained in the "Modifying BroadcastDistributionRule" section, the
colocatedDatasources
will be removed fromBroadcastDistributionRule
. However, I don't think this will cause any compatibility issue.Future work