apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.51k stars 3.7k forks source link

Add namespaces to Druid segments within a data source #9463

Open JulianJaffePinterest opened 4 years ago

JulianJaffePinterest commented 4 years ago

Motivation

Currently, Druid versions segments by their data source, covered interval, and an arbitrary version string (usually the time at which they were written). However, for cases where users want to be able to ingest and query data from multiple sources or selectively update subsets of their data, this requires intermediate processing or over-processing. To support these use cases, we can introduce the concept of a namespace to shard specs and VersionedIntervalTimelines, versioning namespaced shard specs by their data source, covered interval, namespace, and version string. This would allow data sources to be transparently populated from multiple input sources, with overshadowing and atomic updates continuing to function within the context of a name space without affecting data in other name spaces and without affecting the behavior of non-namespaced segments.

Proposed changes

The proposed changes primarily fall into two buckets:

First, modify the PartitionChunk and ShardSpec classes to add the methods default Object getChunkIdentifier and default Object getIdentifier, which simply return the chunk number and partition number, respectively. Further, modify the SegmentId and SegmentDescriptor classes to add an identifier. From here, all calls to ShardSpec's getPartitionNum should be replaced with calls to getIdentifier and all SegmentDescriptor/SegmentId creations updated. Namespaced shard specs can be created that approximate existing shard specs (e.g. a NamedNumberedShardSpec can be created that duplicates the logic of a NumberedShardSpec with the abuts and compareTo methods updated to check for matching name spaces as well). Additionally, real time ingestion specs (which don't allow specifying a shard spec directly) can be updated to optionally take a name space. These changes are transparent to existing code, except for a small performance hit of comparing Objects instead of ints.

The other piece of the puzzle is to introduce a NamespacedVersionedIntervalTimeline that similarly apes the logic of VersionedIntervalTimeline with a few key changes. Primarily, NamespacedVersionedIntervalTimelines can contain a map of string (namespaces) to VersionedIntervalTimelines (timelines for each namespace). Adding, removing, and overshadowing segments and partitions can then be done in the context of the appropriate namespace while look ups can be done across all namespaces. VersionedIntervalTimelines in CachingClusteredClient, BrokerServerView, etc. can then be replaced with NamespacedVersionedIntervalTimelines without affecting any non-namespaced segments (since they'll all be added to the default namespace and thus the same underlying VersionedIntervalTimeline, as they currently are).

These changes should be entirely transparent to users who aren't interested in using them, since all existing behavior will be unchanged. For users who do want to use namespaced shard specs, they can simply specify the appropriate shard spec in their ingestion config (or for real-time ingestion specs, a name space) and namespaced shard specs will be created. These shard specs will only extend or overshadow data with same data source and name space, but all segments with a given data source will be queried together, regardless of name space. Here, this proposal takes advantage of the fact that Druid already handles cases where some dimensions or metrics are available in certain segments for a datasource but not all of them. Metrics are inferred to be 0 and dimensions are inferred to be null. If there are shared dimensions between namespaces, post aggregators can be used to return combined results for any given combination, even if not all metrics or dimensions are present in all name spaces.

Rationale

The primary motivation for this proposal is to support scenarios where users produce data from multiple sources but wish to query this data as if it were a single table. Although there are ongoing efforts in the Druid community to address this via joins #(8728) and union data sources, both approaches have certain drawbacks. The initial phases for join support envision supporting joining enhanced lookup-style dimension tables onto data sources, not joining data sources, and union data sources require all unioned data sources to have identical schemata. Namespacing allows multiple logical data sources to be merged, even if they have differing schemata. The obvious downside to namespacing is that this merging must be performed at ingestion time and doesn't support arbitrary joining at query time.

Operational impact

These proposed changes modify many internal APIs, but do not deprecate or remove any external features or behavior.

These changes are transparent to non-namespaced segments and data sources, meaning that clusters can be upgraded and downgraded without issue. Once namespaced segments are created, data sources cannot be downgraded (whichever namespace was lexicographically last would overshadow all other namespaces).

We have been running these changes in production for ~12 months without a noticeable affect on latency and performance. Of course, our implementation and a community implementation will likely differ to some degree to account for the broader use cases throughout the community. It is possible that this change would introduce small latency regressions for some use cases due to the move to comparing objects instead of ints for partition numbers/identifiers.

Test plan

There are two main pieces of this proposal that need testing: first, that namespacing works as intended, and also that there are no observable changes for users who do not adopt namespacing.

For testing namespacing, we would want to test the following scenarios that new namespaced segments with a higher version should overshadow only existing segments with the same namespace and that namespaced segments with the same datasource are queried across namespaces when the data source is queried. We would also want to test that all current realtime and batch ingestion mechanisms can correctly create namespaced segments.

For testing that there are changes required for non-namespaced segments, the existing Druid tests should be sufficient (e.g. if all our existing tests work without changes, we can be confident that existing use cases will be unaffected).

himanshug commented 4 years ago

I read your email on dev list and consequently https://medium.com/pinterest-engineering/powering-pinterest-ads-analytics-with-apache-druid-51aa6ffb97c1 . Thanks, that was a good read.

From that reading, my understanding of your use case is that you could have multiple dataSources which have same set of dimension columns but different metrics columns. I wonder how difficult was it to improve "union query" handling so that it could accommodate this use case. I see that you noticed that current union query handling couldn't work with dataSources of different schema. Did you try to improve union query handling to make that work and encountered a technical difficulty and led to conclude that namespace feature was simpler to do?

JulianJaffePinterest commented 4 years ago

We have both differing dimension and differing metrics between the namespaces. I investigated modifying union datasources as well, but there were a number of problems:

  1. At the time (and perhaps still), union data sources were queried in serial, not parallel. This is obviously fixable.
  2. Implementing this feature with unioned data sources required implementing query logic to handle disjoin schemata, which namespacing did not.
  3. We've scaled this from a handful of namespaces per data source (where union data sources probably could work) to hundreds for a few (I'm not sure how easy managing 100+ data sources in a cluster would be, but this may not be a major concern)

Since more changes were required for making this work with union datasources, we went with namespacing (and it was fairly simple to implement the changes described above). I see merit in both approaches.

jihoonson commented 4 years ago

@JulianJaffePinterest thanks for the proposal and a nice blog post. The problems of the union query you mentioned above are correct and still exist. But, I believe we should address those problems someday to provide all functionalities supported by the SQL union query. Even if those problems were addressed, would this namespace feature still handle some use cases where the union query cannot? I'm wondering whether their use cases overlap too much or not.

jihoonson commented 4 years ago

I'm also curious exactly what namespaces are for. AFAIK, Druid currently allows users to ingest data into the same datasource from multiple sources. Those sources can have different schema and Druid will handle it at query time by filling missing columns with nulls.

glasser commented 4 years ago

It seems like an interesting application of namespaces is that it allows you to do reingestion and retention policies on a subset of a data source. For example, you could have a data source that contains customer data from customers in several pricing tiers. Querying should work the same for all customers, but you may want to load more history from customers who pay more (either in the "setting historical load rules" sense or the "actually killing old segments" sense), or you may want to run native batch reingestion on cheaper customers to reduce queryGranularity of their older data without needing to also rewrite the data from the higher tier customers.

(Moving a customer between tiers in this model could still be a challenge, though, unless you can create a batch ingestion job that writes to a couple namespaces.)

b-slim commented 4 years ago

I have read the proposal and my understanding is that you are trying to index data form the same source with the same interval of time but you want to break it into more sub tasks to make it parallel is that the main pain point that pushed you to implement this name spacing ? OR is it because you have some data reprocessing and you want to re-process only sub part of the data and the only way know / filter this data is what you are calling namespace ? In my opinion for both cases i would agree with the general sentiment that this is better done in the Query layer as standard union and would be a wonderful contribution that can be used by many other use cases.

himanshug commented 4 years ago

@JulianJaffePinterest thanks for explaining the difficulties. However considering the use case at hand and the alternatives available, order wise, I would first explore fixing union queries which has more benefits. However if a technical limitation is hit that didn't allow problems to be resolved then namespace as a secondary option would be the necessary choice.

@glasser if you tried one dataSource for all customers and found it to be too problematic due to scale etc and really needed per customer-tier QoS, then per customer-tier dataSource would probably be the right fix in that situation.

glasser commented 4 years ago

@himanshug Yeah, I don't know much about union queries and that might be a better way to go.

JulianJaffePinterest commented 4 years ago

Let me explain our use case, which will hopefully clarify some of the questions here:

We have multiple independent pipelines that produce data we serve as a single data source (to give an oversimplified example, you can imagine that one pipeline calculate clicks and impressions while another pipeline calculates conversions). The outputs of these pipelines are queried together, and they share some dimensions and have some unique dimensions, as well as unique metrics. We also produce intra-day computations of these data sets that are updated with daily true-ups. The output of an intra-day run needs to overshadow any existing output for the time it's running for, which can be handled by overshadowing, but the intra-day output of a conversion pipeline shouldn't affect the output of any other pipeline, so simply synchronizing on version and using a linear shard spec won't work. Union data sources, if they were performant, could likely handle this without too much trouble since we're only discussing a few distinct input sources. In our case, most queries actually cover over a hundred namespaces. At that scale, I'm not sure how performant unioning would be (possibly fine). The main benefit namespacing provides is that clients don't need to know or care about these details, nor do they need to know which data sources to union. I agree with the overall point that this all could be done in the query layer (and perhaps even should be done there).

himanshug commented 4 years ago

Yeah, I agree with your concerns about 100+ dataSource scale in union. My guess is that it could still be handled by pushing the union dataSource to Historicals . Similar to single dataSource case, Broker would find segments to be scanned for multiple dataSources in union, and push the union along with segments for each dataSource to right Historicals (current approach is to split queries by dataSource when they are sent to historical, done serially .. even if done in parallel, that would result in too many queries to downstream Historicals). It would likely require introducing a new QuerySegmentSpec similar to MultipleSpecificSegmentSpec but that lists segments for multiple dataSources. If done that way, then union query handling would be very close in terms of processing to that of a single dataSource and scale wouldn't have too much of an overhead due to query being a union query. It is, of course, possible that there would be technical glitches in implementing above, but we don't know that yet.

The main benefit namespacing provides is that clients don't need to know or care about these details

That problem is a lot more cosmetic . It could either be handled by introducing concept of "Virtual DataSource" at the api layer on top of Druid that typically customers have or could be implemented as a feature in Druid itself.

jihoonson commented 4 years ago

@JulianJaffePinterest thanks for more details.

The output of an intra-day run needs to overshadow any existing output for the time it's running for, which can be handled by overshadowing, but the intra-day output of a conversion pipeline shouldn't affect the output of any other pipeline, so simply synchronizing on version and using a linear shard spec won't work.

Just FYI, you can actually do this with the segment locking and the task audit logging. Since you can track what task created what segments from task audit logs, you can overwrite only the segments what you want with segment locking. (The task audit logging is deprecated because we haven't found a good use case for it. If this is a popular use case, then we may need to consider supporting it back. See https://github.com/apache/druid/issues/5859 for details.). But it requires you to track all segments outside of Druid and I guess this would be more complex than the proposal.

As long as there is no unique use case where only this proposal can address, I'm more inclined to fixing the union query properly because we have to do it anyway. As @himanshug mentioned, that could address most of the problems mentioned here even though there is at least one more potential issue with query performance. The segment balancer uses a heuristic that the segments will be more likely queried together as their intervals are closer. Based on this heuristic, the segment balancer assigns the segments of close intervals into different historicals so that they can be processed in parallel. This assumption doesn't apply to the union query. However, I don't know what the impact of this would be.

That problem is a lot more cosmetic . It could either be handled by introducing concept of "Virtual DataSource" at the api layer on top of Druid that typically customers have or could be implemented as a feature in Druid itself.

@himanshug this is true. I guess this could be the view in SQL.

skyrocknroll commented 4 years ago

is it because you have some data reprocessing and you want to re-process only subpart of the data and the only way know / filter this data is what you are calling namespace

@b-slim Right now we are facing this issue Where we have multiple customers data in single datasource and want to reingest specific customer data. How can we do it in query layer?

b-slim commented 4 years ago

@skyrocknroll You can not do ingestion via the query layer, that is not what i said. What you can do and trying to explain is that it is better to have a datasource per costumer, that will lead to better overall management and query performance. If you need to have a view on the top of those multiple customers then we can add something in the query path like a global view to allow that, but in any case you need to do a separate ingestion for each customer.

skyrocknroll commented 4 years ago

@b-slim Thank you for the response :+1: