Open kelvints opened 6 years ago
Hiya. You are on the right track as the region server connection pool is the big bottleneck here. And in your case, you'll have 4 worker threads without anything to do in AsyncHBase since you only have 4 servers.
We face the same issue (though we also have hundreds of servers behind a pool of 16 connections). I had tried doing what you were thinking, adding a hook into the HBaseRPC class that can pass the RPC off to a thread pool for execution so the selector can move to processing another packet.
However after testing, it didn't really help that much. There's a significant amount of time spent parsing the channel buffer and then processing the results into KeyValue's to pass upstream.
A really good solution would be connection pools in AsyncHBase wherein if we see a lot of requests for a given region server, we spawn another X number of connections and round-robin the RPCs. Then when throughput dies we just kill the idle connections. But this is a fair amount of work to get right. Any interest in trying it? ;)
A few follow-up questions to see if we can do an easy patch instead of the hard patch:
Do we know why the callbacks for multiple queries end up getting executed by the same AsyncHBase thread? The thread that runs this seems to rotate over the lifetime of the tsd. I can do a bit more tracing to figure out if this is due to the regionserver response timing. I should note again that that we are using salting so all queries end up making requests from all regionservers. Maybe we can somehow balance/randomize the thread that initiates each callback chain?
Re: "parsing the channel buffer and processing the results into KeyValues", I see that multiple AsyncHBase threads run the decode/processRow/compact callback chain concurrently. This is good, I think, and there doesn't seem to be an easy improvement to make here. However, only a single thread at a time runs mergeAndReturnResults; this is what kicks off GroupByAndAggregateCB. How is it that one callback chain is executed in parallel but another chain (or another part of the same chain?) seems to be executed serially for all queries by a single thread?
The OpenTSDB documentation, e.g., http://opentsdb.net/docs/build/html/user_guide/utilities/varnish.html, and the community in general seem to advocate for running OpenTSDB behind a load balancer. However, in my tests it seems like the only way to avoid getting your query blocked by a slow query running on the same tsd as yours (that could have been issued before or after your query) is to limit the maximum number of requests per tsd to 1 (e.g., by using "maxconn 1" in haproxy). I found that using a health check doesn't effectively remove a tsd from the pool while it is stuck on an expensive query. So your cluster's total concurrency is at most the number of tsds you are running. Is there any advice on how to setup your cluster to provide consistent query performance?
Do we know why the callbacks for multiple queries end up getting executed by the same AsyncHBase thread?
That's the nature of the Netty selector mechanism. When you pass off a call to the network from Thread A, A is done and can move on to another task. Then Thread B in Netty's worker pool receives a poll from the kernel saying a packet is ready so B will execute the Netty pipeline, passing the packet up to the RegionClient in AsyncHBase. The RegionClient decodes the data then executes the callback, all in Thread B's context.
The thread that runs this seems to rotate over the lifetime of the tsd. ... I should note again that that we are using salting so all queries end up making requests from all regionservers.
The reason for the rotation is that, with salting, the last chunk of data to come in from a region server on some Thread X, will wind up executing the scanner callback. Netty selectors are, if I remember correctly, assigned round-robin to the worker pool threads on connection. The selectors are then sticky to that worker (though I need to double check this). So if your salted data is across 20 region servers, and lets say server 1 is on worker thread A and server 5 is on worker thread D, then you'll sometimes see aggregations executed under context A whereas others are under D. Just depends on which region server answered quicker at that point in time.
Maybe we can somehow balance/randomize the thread that initiates each callback chain? If selectors are sticky at the Netty level, we can't do that. Additionally, the selector won't go back to process another message from any of it's sockets until the current execution in that thread is done. That's why it would be better to instantiate new region server connections to spread the load across different threads in the Netty pool. (again I need to verify netty's behavior, it's been a while)
However, only a single thread at a time runs mergeAndReturnResults; this is what kicks off GroupByAndAggregateCB. How is it that one callback chain is executed in parallel but another chain (or another part of the same chain?) seems to be executed serially for all queries by a single thread?
That's something I'd like to improve in 3.0. Right now in 2.x we have to have the entire collection of time series IDs together in one thread and one bucket so we can perform the join. In 3 I want to be able to create graphs that allow for parallelization during aggregation.
However, in my tests it seems like the only way to avoid getting your query blocked by a slow query running on the same tsd as yours (that could have been issued before or after your query) is to limit the maximum number of requests per tsd to 1 (e.g., by using "maxconn 1" in haproxy). I found that using a health check doesn't effectively remove a tsd from the pool while it is stuck on an expensive query. So your cluster's total concurrency is at most the number of tsds you are running. Is there any advice on how to setup your cluster to provide consistent query performance?
How many region servers do you have behind your TSDs? That's the real bottleneck right now in that if you have 1 TSD and 1 region server, you're right, the slow queries will backup all queries as the single selector thread is processing every query result. The default worker pool is cpu core * 2
threads in size. So if you have that many region servers with salting, then you're in a much better position to load balance between threads. We run 10 region servers per TSD and the load is pretty good most of the time though we block the big queries with the size limiter thats in 2.4RC2
How many region servers do you have behind your TSDs? That's the real bottleneck right now in that if you have 1 TSD and 1 region server, you're right, the slow queries will backup all queries as the single selector thread is processing every query result. The default worker pool is cpu core * 2 threads in size. So if you have that many region servers with salting, then you're in a much better position to load balance between threads. We run 10 region servers per TSD and the load is pretty good most of the time though we block the big queries with the size limiter thats in 2.4RC2
We run one region server per tsd. In one site we have 13 24 core (hardware threads) hosts each running a tsd, regionserver, and datanode. If I'm hearing you correctly, two queries block each other when both require scanning the same (or overlapping) sets of regionservers. In that case, I think we have a further complication in our setup: we chose to use 52 salt buckets in this site, and even more buckets in another site, in order to distribute writes over more regions, so an individual memstore would take longer to fill up (and thereby increase the max write throughput). However, this makes it so that with fewer than 52 regionservers, all queries are highly likely to need to talk to all regionservers, assuming regions are balanced. So it sounds like we should set #<salt buckets>
much smaller than the number of regionservers and also run more regionservers.
Specifically, the GroupByAndAggregateCB callback runs on an AsyncHBase I/O Worker thread. The upshot of this is that when a query with a high aggregation time is submitted to a tsd all other in-progress and newly submitted queries are effectively blocked while the problematic query is in its aggregation phase. I can provide the test setup I am using to reproduce this behavior if necessary. We are using OpenTSDB 2.3.0 with salting enabled. In the test setup I'm using
with 4 regionservers. The documentation for com.stumbleupon.async.Deferred reads:
so it is not totally surprising that this is happening, but it seems like a significant bottleneck in the design. I'm looking at patching OpenTSDB to run the callback(s) in a separate thread pool but I'd like to know I'm not completely off the mark here or overlooking something.