apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.29k stars 3.66k forks source link

[Proposal] Add published segment cache in broker #6834

Open surekhasaharan opened 5 years ago

surekhasaharan commented 5 years ago

Problem

Some of the sys.segments queries are slow, they are taking as long as ~10-20 sec, which is not desirable. The cause of this slowness is call from broker to coordinator API which happens every time a query is issued to sys.segments table, it’s the getMetaDataSegments (invokes coordinator api /druid/coordinator/v1/metadata/segments) method which gets called from the SegmentsTable#scan() in SystemSchema.java. Coordinator can potentially returns millions of segments and most of the time is spent in parsing the json response and creating DataSegment objects.

Motivation

It would be useful to make these queries faster as these are used in an interactive way by the end user today. In future a unified druid console can be built on top of sys tables(#6832) and the new segment locking can also benefit from all used segments present in broker.

Proposed Changes

To fix this performance bottleneck, plan to add :

  1. segment cache in broker (phase 1)
  2. a new api in coordinator (phase 2)

Phase 1

To speed up the sys.segments queries, in phase1 I want to add a published segments cache in broker. Broker already maintains a cache of all available segments via the BrokerServerView, as brokers are caching segments announced by historicals and realtime tasks, but not from metadata store (published segments are cached in coordinator only). It's going to be a pull model in this phase, where broker polls the coordinator to get latest published segments and updates it's cache periodically in a background thread. Potential issue is it could lead to memory pressure on broker if the number of published segments is large. To minimize this memory pressure on Broker, the DataSegment instance should be shared between the “available” segments and “published” segments in broker. DataSegment already uses Interners, so we can keep interned DataSegment objects in the published and available segments, which prevents memory bloat by using the same reference in both places. Then, roughly, the extra memory would only be required for DataSegment objects which are “published but unavailable” segments, which ideally should be close to 0 segments. Another point to consider is if a broker is configured to watch some particular tiers or datasources. If these broker configs are enabled, then extra filtering would be done on watchedDataSources in the published segment cache in broker. There is no need to filter on watchedTiers because we can't tell the difference between published segments that are unavailable, and published segments that are served by tiers not in watchedTiers. This would be documented in the sys table docs.

Implementation details:

A new class MetadataSegmentView would be added which maintains the published segments cache in memory. The broker will keep a single DataSegment object in heap in case a segment overlaps in both published and available segment collection, via "interned" objects. BrokerServerView maintains the available segments via DataSegment in ServerSelector, the AtomicReference for DataSegment object will store an interned DataSegment object. On deserialization of json stream of published segments from coordinator, each DataSegment object is interned with the same interner (used in ServerSelector) and cached in the published segments collection. For the filter on watchedDataSources, if BrokerSegmentWatcherConfig#watchedDataSources is not null, then the segments with dataSource not in watchedDataSources would be filtered out and will not be stored in the published segment broker cache.

Phase 2

In phase 2 for this improvement, a more efficient coordinator API should be added. There can be several ways to add this new coordinator API, see rejected alternatives for other options considered.

This API returns a delta of added/removed segments and takes timestamp as argument. When broker comes up, it gets all the published segments from coordinator. Broker does following: orders the received segments by the timestamp (created_date), saves the published segment in it’s cache and keeps track of the last received segment’s timestamp. Subsequent calls to the coordinator api will only return the segments that have been added or removed since the last timestamp.The broker will poll the coordinator API at a regular interval to keep the published segment cache synced in a background thread. "added_ segments" delta can be computed based on the created_date, additional work would be required to compute the "deleted_segments" delta. Coordinator will need to maintain an in-memory list of deleted segments and will need to be notified when a segment gets killed external to coordinator (unless this behavior is changed as suggested in #6816). Since the deleted segments count can increase, to avoid memory pressure, coordinator can remember an hour(or some other configurable value) of deleted segments. In case, the requested timestamp is older than an hour, all the published segments can be resynced. In case of coordinator restart or leader change, again, it can send all the published segments.

New or Changed Public Interface

A new rest endpoint will be added to coordinator GET /druid/coordinator/v1/metadata/segments/{timestamp}

Add a timestamp field to DataSegment object which represents the created_date from druid_segments table in metadata store.

Rejected Alternatives:

These options were also considered for the coordinator API

  1. Coordinator sends just the ids of the published segments instead of complete DataSegment serialized objects, and then broker does a diff and finds out the segments which are not available, and then makes another call to get details for those segments. This approach was rejected because sometimes the segment_id list can be pretty large and it can cause a lot of network traffic between coordinator and broker processes and we may not achieve the performance improvement we are looking for.
  2. Add a new table “druid_transactionlogs” to the metadata store, which keeps track of the segment addition and removal. The coordinator API can then query this table when it receives a GET request from broker for any timestamp, it can also query this to maintain it’s own cache. For example,
operation segment_id timestamp
add s1 ts_0
disable s2 ts_1
delete s3 ts_1

It can use write ahead logging to take care of failures/restarts in any process. While this approach is good for maintaining consistency between coordinator and broker cache as well as fault tolerance, it may not give the speed improvement if we invoke the db call on each API invocation. Another challenge would be to keep the druid_segments table and druid_transactionlogs table in sync. Unless we need this for broader use cases, it may not be worth the extra design and effort.

jihoonson commented 5 years ago

@surekhasaharan thanks for raising this proposal! It would be really useful for #6319. It looks good to me overall, but I have some questions for details.

For phase 1,

For phase 2,

For both phase,

surekhasaharan commented 5 years ago

@jihoonson Thanks for reading the proposal and your comments. I have added more details in the proposal for phase1 to address your questions.

jihoonson commented 5 years ago

@surekhasaharan thanks. I have some follow-up questions.

If these broker configs are enabled, then extra filtering would be done on watchedDataSources in the published segment cache in broker.

This part looks vague. Would you please make it clear what configurations you're referring? Also please add why we don't have to consider tiers.

One caveat with this is, the new MetadataSegmentView has a dependency that BrokerServerView should be initialized before MetadataSegmentView starts caching published segments.

Since segments are always published and then loaded by historicals, segments should appear first in the metadata store. How do you reuse the same instance if a segment was only in MetadataSegmentView is added to brokerServerView?

surekhasaharan commented 5 years ago

@jihoonson thanks. Added more details on broker config to make it clearer.

Since segments are always published and then loaded by historicals, segments should appear first in the metadata store. How do you reuse the same instance if a segment was only in MetadataSegmentView is added to brokerServerView?

Changed the design for this part, to use the interned objects, this approach does not depend on the order in which DataSegment objects appear in broker heap.

gianm commented 5 years ago

It looks like phase 1 was implemented by #6901, and phase 2 remains unimplemented.

leventov commented 5 years ago

@surekhasaharan did you consider the following designs:

What do you think about these?

gianm commented 5 years ago

Brokers delegate Druid SQL queries against sys.segments to Coordinator. So brokers don't have to deal with published (used) segments in their memory at all.

We had talked a bit about this idea offline last week and I believe generally thought it was a good idea in isolation. In particular, we had thought about moving the entire sys schema implementation to the Coordinator and having the Broker send any SQL queries on sys over there. Users could also query sys tables on the Coordinator directly if they wanted.

However, there were two counterpoints raised:

Brokers access the metadata store to update their view of published (used) segments directly, without Coordinator as a proxy.

This one was considered as well, the main counterpoint raised was that currently, only the Coordinator and Overlord have a direct dependency on the metadata store, and there was a desire to keep it that way. In turn there were two reasons for that:

For memory savings, adding an extra flags to DataSegment object: isAvailable, isPublished, and using a single unified ConcurrentHashMap view for the purposes of both BrokerServerView and MetadataSegmentView.

This was not discussed. I haven't thought through the implications fully but I would worry that it means either DataSegment must no longer be immutable (because isAvailable, etc can change) or that it remains immutable but can no longer be interned effectively (because there's no longer a single DataSegment object per SegmentId). So I would look at those things first when evaluating this idea.

leventov commented 5 years ago

A desire to minimize the number of services that depend on the metadata store, both for architectural complexity and load reasons.

It might be (sometimes, or even often) the opposite of that:

Also, it would allow simplifying the codebase, because there would be a single abstraction used in both Coordinator and Brokers. I initially turned my attention to this proposal when I was forced to write explanations of how MetadataSegmentView is different from SegmentsMetadata.

leventov commented 5 years ago

Another design question, regarding #6901: was it considered to not add druid.sql.planner.metadataSegmentCacheEnable configuration parameter? Perhaps, just start caching after the first access to sys.segments?

leventov commented 5 years ago

This was not discussed. I haven't thought through the implications fully but I would worry that it means either DataSegment must no longer be immutable (because isAvailable, etc can change) or that it remains immutable but can no longer be interned effectively (because there's no longer a single DataSegment object per SegmentId).

DataSegment objects don't need to be interned if the unified view is used.

FYI, I've already explored this idea in the Coordinator's context here: https://github.com/metamx/druid/commit/772bd29adb829cb5c09eb233ed3bf93209c2a456#diff-d349ac1596d0704583da60485358e307R117 and will almost certainly use it some way in the course of implementing #7159.

gianm commented 5 years ago

It looks like you tried making them mutable and commented that it was a 'horrible abstraction leak' -- I think I agree. Are you thinking now that it's worth it for the memory savings even due to the leaky abstraction? Mutable DataSegments seem to me like something that we should try to avoid if possible.

leventov commented 5 years ago

Well, now the message seems for me to be more hysterical than it should be. Having isLoaded, isAvailable, isPublished flags in a class called DataSegment seems OK to me (there may be a private subclass like "DataSegmentOnBroker"). You should do the right thing in very few places, not all over the place. Add good comments to these places and everything will be OK.

If you want to process sys.segments queries on Brokers in the first place, I think this optimization is worth doing.

leventov commented 5 years ago

In addition to previous questions: was it considered to use binary communication protocol instead of JSON?

surekhasaharan commented 5 years ago

Binary protocol was not considered at the time, but it does make sense to replace json with smile format for all internal process communication in Druid and specifically for transferring published segments data here. Will definitely consider doing this in a follow-up PR. Thanks.

leventov commented 5 years ago

The binary protocol issue is here: #7798

An extra thought regarding "Brokers pulling metadata directly from the DB" vs "Brokers pull metadata from Coordinator" is that for best fault-tolerance, Brokers should actually be able to do both, and failover between these ways if the primary configured way of pulling the metadata fails (e. g. the Coordinator is down, the DB is down, or there is some network partition).

himanshug commented 5 years ago

In particular, we had thought about moving the entire sys schema implementation to the Coordinator and having the Broker send any SQL queries on sys over there. Users could also query sys tables on the Coordinator directly if they wanted.

I like that because, as a user I would like to use the feature introduced by sys table but it would be nice if it wasn't at the expense of each broker needing whole bunch of extra memory that I would like to save for real data queries. Regarding the counter arguments ...

It is a somewhat common request from our users to add an option to the Broker to either fail a query, or provide a ....

We have a slightly different feature already available. Query context key, "uncoveredIntervalsLimit" that can be used in the query to return any intervals not covered by segments that we used to process query. this adds a header in response and user can discard the results. I think it was documented at some point in Query Contexts doc. This should work for many users. However it is not exactly what you pointed. For that, a "cache" wouldn't be enough because it could be stale and we wouldn't be able to guarantee whether results are really partial or not. But, I get it that "good enough" might be good enough.

6319 contemplates a design for finer-grained loc ...

sorry, haven't gone through it yet, so don't understand it.

In any case, both counter arguments are basically saying that we need the cache at broker for other reasons. I would propose that we don't make cache at broker a prerequisite for sys table functionality .. other features might need the cache when there is no other way but me as a user would like to use sys without incurring extra memory at broker if possible.

That said, if we do decide otherwise then I am fine with Broker getting information from Coordinator instead of directly going to DB for reasons that @gianm mentioned. General expectation from cluster is that data queries should continue to work in case of node failures as much possible not that all features need to work . I wouldn't worry about coordinator being down leading to Broker not having up-to-date response for sys table queries , when coordinator is down then new segments are not loaded on historicals (and many other big problems) happen so "all coordinators down" is a pretty bad situation anyway which cluster operators would want to resolve as quickly as possible.

github-actions[bot] commented 11 months ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.