grafana / metrictank

metrics2.0 based, multi-tenant timeseries store for Graphite and friends.
GNU Affero General Public License v3.0
623 stars 104 forks source link

Optimize for a large number of new metrics getting added while still serving queries fast #1353

Closed replay closed 4 years ago

replay commented 5 years ago

We're still experiencing serious issues with instances that have a large index and a high metric churn, in the worst case all queries time out when the index gets slammed with too many adds per time. We should first add a benchmark which adds a large number of metrics to a large index, while concurrently querying it. Then we can try to optimize it based on that benchmark so queries still get served fast, while index adds happen eventually but with lower priority.

woodsaj commented 5 years ago

A/the major performance problem when there are a high rate of queries is lock contention. This is best demonstrated with https://play.golang.org/p/XS4wY0VOZIV

When a new series arrives it will try to acquire a write Lock. As soon as it asks for a lock, all future read lock requests will be blocked until the write lock is released. But the write lock wont be acquired until all current read locks (running finds) complete.

I think one way we can address this is by having new series get buffered and added in batches asynchronously. This doesn't look too hard to achieve.

replay commented 5 years ago

@Dieterbe and I just had some discussion about this topic, that's also one of the possible solutions that we were thinking of (to do it similar like in the pruning). Another, but more complicated, solution might be to break that index lock down into smaller more granular ones. This would be more complicated to implement, but it would probably help much more.

replay commented 5 years ago

I'll first first try to write a test program which allows us to reproduce / measure this specific problem. Then we can try to figure out based on that what the best ways to improve this issue are.

replay commented 5 years ago

To solve this immediate issue, this is probably going to be complicated, but I think this is how we should break the index lock up.

There are three datastructures which are currently protected by the index lock:

1) The hierarchic tree of nodes (UnpartitionedMemoryIdx.tree) 2) The flat map keyed by IDs (UnpartitionedMemoryIdx.defById) 3) The tag index keyed by org, tags, values (UnpartitionedMemoryIdx.tags)

Currently there is one lock to protect all of them together, all index modifications need to keep that lock while updating all of those structures.

1) We could create tree-branch locks. I think adding an rw lock each node in the tree would cause too much memory over-head (one rw lock is 24 byte). But we could add rw locks at specified levels in the tree to lock only that branch up to the next tree node lock. For example:

2) This flat map could simply be partitioned. Currently it's keyed by MKeys, we could simply partition it into nested maps where the first level is keyed the first byte of the MKey, the second level is the second byte, up to the Xth level. Then each partition, and maybe even each sub-partition would get it's own RWlock, which would reduce the congestion when updating this data structure.

3) The tag index is already keyed by org -> tags -> values, we could simply give it one write lock which locks org add/remove operations, one that locks tag add/remove operations per org, and one which locks value add/remove operations for each org & tag combination.

All those described changes are probably going to be out of scope just for quickly fixing the described issue. But I think in the long term we need to get rid of this single global index lock because otherwise we'll always eventually reach a point where congestion is becoming a problem.

woodsaj commented 5 years ago

i think option 1 is going to complicate the code too much.

option 2, is probably best achieved by just using a concurrentMap eg, https://github.com/orcaman/concurrent-map

The clusters that are seeing performance problems are single tenant clusters. so option 3 wont have any impact.

Another option is to make writes to the index asynchronous to allow us to batch and rate limit them. This change could be made in a few small PRs. step 1) make adding to the index async, via a channel, batching multiple updates together with 1 lock. step 2) reduce write lock time needed to add new entries by building a changeSet using only read locks, then acquire the write lock and apply the change. We can do this safely if we use a single goroutine for making writes, receiving the write changes via channels. This will ensure that nothing gets added/removed from the index between when we start generating the changeSet and when it is applied

replay commented 5 years ago

The clusters that are seeing performance problems are single tenant clusters. so option 3 wont have any impact.

I think it would still help. Let's say org 1 already has a metric which is tagged os=osx and dc=one. Now this org 1 wants to concurrently add 2 new metrics where one has the tag os=ubuntu and the other has the tag dc=two. With tag-level locks both of those insert operations only need to acquire the according write lock for the tags os and dc, so they won't have to wait for each other.

Currently the data structure that we use for this is:

map[uint32]TagIndex // org -> tag index
type TagIndex map[string]TagValue // key -> list of values
type TagValue map[string]IdSet    // value -> set of ids

We could replace that with something like:

type TagIndexesByOrg struct {
    sync.RWMutex
    indexes map[uint32]TagIndex // org -> tag index
}

type TagIndex struct {
    sync.RWMutex
    keys map[string]TagValues // key -> list of values
}

type TagValues struct {
    sync.RWMutex
    values map[string]IdSet  // value -> set of ids
}
replay commented 5 years ago

Another option is to make writes to the index asynchronous to allow us to batch and rate limit them. This change could be made in a few small PRs. step 1) make adding to the index async, via a channel, batching multiple updates together with 1 lock. step 2) reduce write lock time needed to add new entries by building a changeSet using only read locks, then acquire the write lock and apply the change. We can do this safely if we use a single goroutine for making writes, receiving the write changes via channels. This will ensure that nothing gets added/removed from the index between when we start generating the changeSet and when it is applied

I think that's a good plan to improve the performance in the short-term, even if it still doesn't really solve the problem that there is one single global lock for all index structs.

Dieterbe commented 5 years ago

option 2, is probably best achieved by just using a concurrentMap eg, https://github.com/orcaman/concurrent-map

not sure if i like the idea of using a map type that needs runtime type assertions. This one also contains a bunch of stuff that we don't need IMHO writing a strongly typed partitioned map is something we can easily do ourselves in <100 lines.

Before we speculate too much on what the issue is and how to solve it, let's work on the simulator so we can easily get contention and cpu usage profiles. (though i also suspect the main RWMutex)

robert-milan commented 5 years ago

Most of our solutions involve reducing lock contention by either some type of sharding, batching, or locking mid-way down a path. We do already have a partitioned index, and although our last benchmarks were not great, I think it has the most potential. Why not partition the index first, and then also in each partitioned index we can implement the other solutions as needed? So, let's get some more benchmarks for the partitioned index as well.

Before we speculate too much on what the issue is and how to solve it, let's work on the simulator so we can easily get contention and cpu usage profiles. (though i also suspect the main RWMutex)

Yes, let's do that, and yes it is probably the main RWMutex. Even if it is the main RWMutex, we still need to know which functions/methods are taking up all of our time.

option 2, is probably best achieved by just using a concurrentMap eg, https://github.com/orcaman/concurrent-map

We could possibly use that as a base and modify it to our needs, but I don't think it is optimized enough for our use case. Since we are facing CPU / lock contention issues I think we need something more custom tailored to what we are doing.

This flat map could simply be partitioned. Currently it's keyed by MKeys, we could simply partition it into nested maps where the first level is keyed the first byte of the MKey, the second level is the second byte, up to the Xth level. Then each partition, and maybe even each sub-partition would get it's own RWlock, which would reduce the congestion when updating this data structure.

Pure speculation, but that will probably end up being faster than concurrent-map. Or, as Dieter said, we could just write our own.

Another option is to make writes to the index asynchronous to allow us to batch and rate limit them. This change could be made in a few small PRs. step 1) make adding to the index async, via a channel, batching multiple updates together with 1 lock. step 2) reduce write lock time needed to add new entries by building a changeSet using only read locks, then acquire the write lock and apply the change. We can do this safely if we use a single goroutine for making writes, receiving the write changes via channels. This will ensure that nothing gets added/removed from the index between when we start generating the changeSet and when it is applied

I really do like the idea of batching in general, and I don't think this would be very difficult to implement. As replay pointed out, this could be a good short-term solution.

I think a mix of a lot of these ideas would be a good long-term goal. I believe Dieter's benchmarking and documenting approach will be best used here. This problem has popped up a few times in the past and we have not solved it yet. We should carefully craft tests to reproduce similar results, get a good baseline, and then test whatever implementations we want and compare them to base and each other.

woodsaj commented 5 years ago

We do already have a partitioned index, and although our last benchmarks were not great, I think it has the most potential.

The main reason that the partitioned index didnt work as well as hoped is that partitioning the index up doesnt address the issue of attempts to acquire the writeLock causing all new find requests to be blocked until existing finds complete then the write completes. REgardless of the number of partitions, there will always be 1 rlock per partition for each find request. So a single write in any partition will block all find requests as they wait for the blocked partition.

One potential quick fix for this is instead of wrapping calls to find() in a single lock, we move the rLocks into find() and acquire/release locks every time we read data that might receive writes. This will increase the number of rlocks acquired/lease but significantly reduce the duration the locks are held.

Dieterbe commented 5 years ago

I gave the idea that @woodsaj pitched during the meeting today a bit more thought. I don't see why we would go for a solution that puts 2 queues behind one another, making the whole thing harder to reason about, and creating more noise and overhead (extra goroutines, channel operations, etc). We already have a queue (kafka), i think that's enough. Seems much simpler to me add UpdateBatch and AddOrUpdateBatch to the Index interface (and its implementations)

robert-milan commented 5 years ago

Unless at some point we don't have kafka. I think this logic needs to stay with the affected area, in this case the index. This isn't a kafka problem, it's an index problem, so that's where it should be solved as much as possible.

Also, by making it a non-index problem we are actually hiding the issue, which could cause confusion down the road.

Dieterbe commented 5 years ago

I'm not suggesting to "make it a non-index problem" or to "hide the issue". I'm saying the index can implement optimized batched methods and "advertise them" via its api to callers. in Go it seems like a common enough paradigm to have 2 similar methods or api's with different tradeoffs. E.g. the msgpack library we use takes this much further: https://github.com/tinylib/msgp#performance

woodsaj commented 5 years ago

We already have a queue (kafka), i think that's enough

It is not.

The data in kafka is all datapoints. The problem we have is only with datapoints for new series. We only know that they are new series when we look in the index and find they are not there.

Dieterbe commented 5 years ago

The problem we have is only with datapoints for new series

and why is that a problem? we can simply hand everything to the index in batches, and let the index do the right thing. there's no overhead cause we can simply recycle the buffer/slice across each call. introducing a 2nd queue introduces all kinds of complexity, requires more instrumentation and makes MT harder to understand

robert-milan commented 5 years ago

That reduces the effectiveness of batching. As @awoods pointed out, we don't want to batch all datapoints, but specifically only the new metrics that need to be added to the index, since that is where the lock contention / slow down is happening (too many writers queued, which will then block all the readers thus causing query timeouts).

If we batch everything in kafka and then let the index handle it we are reducing the effectiveness and efficiency of our batch job by totalBatched - metricsToUpdateButNotAdd, as well as unnecessarily slowing down update operations which only require a read lock.

This also means that we would need to acquire a write lock instead of a read lock and waste cycles updating which is time that could be better spent adding. In some instances that might be a relatively small hit, while in others it could be huge.

Now, there is something that might be worth benchmarking. If you want to batch ALL of them we could remove the atomic operation from update because we would already have a write lock.

Dieterbe commented 5 years ago

To be clear, when I said "let the index do the right thing", I specifically didn't imply a particular implementation. So when the index receives a batch update, it can still use the read lock to separate out the updates from the adds for example. I never said we should use a write lock for updates. The updates could probably be done the same way we do now. The adds would be batched with a single write lock. You're right that the more metricsToUpdateButNotAdd we get within 1 batch call, the smaller the batch gets, but in this case we also don't really have an issue. All the adds get batched together, so it should solve the problem AFAICT. It seems desireable to use smaller batches when the proportion of adds becomes low compared to the overall volume, because otherwise it would take long trying to put a batch together. (I assume awoods' proposal would do something similar)

woodsaj commented 5 years ago

This has been fixed with https://github.com/grafana/metrictank/pull/1365

replay commented 5 years ago

Reopening this because we're still seeing issues. Earlier when i talked with @woodsaj he suggested an idea which i liked a lot:

We could move all adds/deletes/updates into one go routine. This routine would loop over an event channel and consume events to apply to the index. One minor advantage of this is that this way pruning, adding, delete-calls by users etc don't have to fight each other for the write lock because they'd all get executed in the same thread. But the big advantage would be the fact that we know that nothing else than this thread ever modifies the index. This means in this event loop we can prepare the upcoming actions while holding a read lock, then switch to a write lock, and then execute them. This would reduce the write lock time. For example when we adding metrics we'd then not need to check if that metric is already present, if the tags need to get added to the tag index, and if we need to add branches to the tree while holding the write lock anymore. We could prepare all those actions while holding the read lock, switch to the write lock, and then apply the prepared actions. Similarly, when pruning deletes metrics it currently needs to do a lot of this preparation work while holding the write lock. Once we know that no other thread ever modifies the index this write lock time could also be reduced a lot.

In the case of the partitioned index, we'd have one of those event executor threads per partition.

What do you think about that @Dieterbe @robert-milan @fkaleo

Dieterbe commented 5 years ago

interesting idea. not to be confused with the common pattern of doing all operations in a single goroutine via channel reads (in which case there would be no need for locking at all). i like this hybrid approach because it still allows concurrency for all non-modifying operations.

can we come up with a prototype to validate the approach?

replay commented 5 years ago

not to be confused with the common pattern of doing all operations in a single goroutine via channel reads (in which case there would be no need for locking at all).

Correct, reads would still be concurrent as the read lock can be acquired concurrently anyway.

replay commented 5 years ago

Another, unrelated idea which would help: When the write queue flushes its queue it currently always flushes the whole queue, this has the potential to hold the write lock for a long time. We should modify it so it works similar to the index pruning, which only holds the write lock for a defined amount of time and then releases it, even if the queue has not been fully processed yet.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.