citusdata / citus

Distributed PostgreSQL as an extension
https://www.citusdata.com
GNU Affero General Public License v3.0
10.35k stars 656 forks source link

Auto-adjust the pool size in adaptive executor #3116

Closed onderkalaci closed 4 years ago

onderkalaci commented 4 years ago

Already listed in #2665 but probably better to have a separate issue.

One of the most important settings of the adaptive executor is citus.max_adaptive_executor_pool_size, which is the max. limit of the connections that the executor is allowed to open per worker node. Setting it low decreases the parallelism (and hence the performance) on the workers. Setting it too high puts a lot of pressure on the workers, especially when there are concurrent queries running on the coordinator.

To avoid performance problems, we've implemented an algorithm called Slow-Start (#2779), where we start with a small number of connections per node, and open new connections when they are necessary in 10 msecs (a.k.a, citus.executor_slow_start_interval).

For example, if the queries on the shards take a short time (~0.5 msec), say index-only scan, the adaptive executor can use the same connection for executing all the shard queries on that worker node. That's great, it saves opening connections to the workers, which is pretty expensive operation.

The problem with that approach is that, if the workers are busy because of many concurrent queries, they'd execute queries slower. In response, the adaptive executor would open even more connections, putting a lot more pressure on the workers. Instead, it'd be great if the adaptive executor can realize that the workers are busy, so there is no point in opening more connections (e.g., decrease the pool size) - pushback.

Some alternatives we've discussed earlier, which makes some sense but requires further discussion, is the following. All of the above only applies for multi-shard queries. Single shard queries would always get the cached connection, so that's OK.

1) While (or at the same UDF?) the coordinator is sending SELECT assign_distributed_transaction_id(0, 1, '2019-10-22 12:18:48.436979+02'), it also sends one more command, which returns a metric to identify the busyness of the worker node. Depending on that, a heuristic on the coordinator decides the pool size. We could rely on backend_data infrastructure that we use for deadlock detection. This prevents having one more round-trip to the worker nodes.

2) The coordinator counts the number of concurrent queries per node in the shared memory. This doesn't work well on MX and could have performance impact because of shared memory accesses.

3) The coordinator periodically asks for the busyness of the worker nodes, similar to deadlock detection or 2PC recovery, and caches that information in a shared memory?. The problem with this is the interval to check this information. If the interval is long, it won't be accurate. If it is low, it'd add extra pressure to the workers, especially with MX, this'd be even more problematic.

Any further ideas? My favorite is 1, which adds little overhead, pretty accurate and scales well with MX.

SaitTalhaNisanci commented 4 years ago

Do you have an idea for the metric for option 1? Is it something like cpu/memory/io usage?

onderkalaci commented 4 years ago

High-level description for the proposed solution

We considered several different approaches for auto-adjusting the pool size. Initially, we considered adjusting the pool sizes based on the CPU/Memory utilization of the worker nodes (such as the proposals in the issue description). Soon after, we realized that relying on those metrics has several shortcomings. First, keeping up to date information for those is not a trivial task. Second, accessing those metrics might create some platform dependencies (e.g., see htop source code for different platforms). In the end, we'd require pretty complicated code to get the information we need on different platforms. Finally, and probably the most importantly, adjusting the pool size based on these metrics would create pretty complicated system that no one could understand/predict the behavior easily (including citus-devs and mostly citus-users).

Instead, we propose to adjust the pool size based on a single, understandable and predictable metric: citus.max_shared_pool_size. In a single coordinator scenario (non-MX), the expectation is that citus.max_shared_pool_size == max_connections on the worker nodes. In MX case, the value can be adjusted considering that each node can connection any other node.

In very simple words, the goal is to prevent opening more than citus.max_shared_pool_size number of connections to any particular worker node from the coordinator across sessions. With that, we'd be preventing any connection attempt to fail due to excessive number of concurrent backends on the coordinator.

Though, we think that it may not be sufficient for all cases, we could also provide fair budgets (e.g., allowed number of connections) to every session on the coordinator. To do that, we propose to implement a simple throttling mechanism on the executor. Basically, prevent any backend to open more than citus.max_shared_pool_size / backend count number of connections to a particular node. (This is an overly simplified explanation, see below for details.)

Implementation proposal

The proposal includes 3 sub-items to implement.

Changes on the Connection Management

Shared Connection Counter

The information about the total number of connections established for a particular worker node is at the core of this approach. To do that, we propose to maintain a hash table in the shared memory, where the key is [nodename, nodeport] and the value is simply unsigned int. Note that, we do count the connections server level, (a.k.a., across databases), simulating max_connections.

The shared memory should be protected by a lock. We should try LwLock and spinlock to see which one performs better.

The main APIs that should be provided are the followings:

/*
* Tries to increment the shared connection counter for the given host,port.
* The function first checks whether the number of connections is less than
* citus.max_shared_pool_size. If so, the function increments the counter
* by one and returns true. Else, the function returns false.
*/
bool TryIncrementSharedConnectionCounter(host,port);

/* return the current value of the counter for the given host port */
uint32 GetSharedConnectionCounter(host,port);

Introduce Optional Connection Concept

In a typical multi-shard query, the executor is allowed to open citus.max_adaptive_executor_pool_size number of connections per worker node. But, even a single connection per worker node is sufficient to successfully execute a distributed query. Thus, in a sense, the remaining connections can be considered as Optional connections.

To have that, add OPTIONAL_CONNECTION flag to MultiConnectionMode. Implement roughly the following:

/* if the connection is optional, and we cannot get a slot on the worker, bail out */
if ((flags & OPTIONAL_CONNECTION) && !TryIncrementSharedConnectionCounter(host,port))
{
    return NULL;
}

We should only use OPTIONAL_CONNECTION in the adaptive executor, when the backend has already citus.max_cached_conns_per_worker connections established.

Wait up-to 60 seconds (?) if there is not avaliable connection slot on the worker

When concurrent multi-shard queries have already opened citus.max_shared_pool_size number of connections to a particular worker, and one another backend aims to establish an essential connection, we should be cautious. If we let the connection establishment proceed, it is expected that the connection establishment fail with too many connections error on the worker. Instead, we can do wait-and-retry loop for sometime.

I noted as 60 seconds, but that's arbitrary number. We could make it an internal GUC.


if (!(flags & OPTIONAL_CONNECTION))
{
    while (!TryIncrementSharedConnectionCounter(host,port))
   {
        if (moreThan60SecondsPassed)
        {
            ereport(ERROR, "could not get a connection slot to worker %s:%d");
        }
        /* sleep sometime and retry */
       WaitLatch();    
    }
}

Changes on the Adaptive Executor

Changes on the connection management are essential for preventing to see ERROR: too many clients already errors. However, as the connection management doesn't have any idea on what's happening on the execution, it cannot provide any fairness among the backends. It could easily happen that one backend gets too many connection slots from the workers, and others get much less connections.

To prevent that, we can throttle the per backend citus.max_adaptive_executor_pool_size under high concurrency.

Throttle the pool size

Throttling is only required for multi-shard queries. Similarly, throttling kicks in after citus.max_cached_conns_per_worker number of connections established.

In order to do this precisely, we should move targetPoolSize field from DistributedExecution to WorkerPool. After that, we should prevent any backend to get unfair number of connection slots on the worker.

At this point, we should consider several cases. First, if the database is hammered by only multi-shard queries (e.g., tpch like queries). Second, some backends running single-shard queries (tpcc like) and some backends running tpch like workloads. Third, the database is hammered by only tpcc like queries.

The latter is trivial as each backend can easily get 1 connection to the target worker. The problem becomes harder to optimize as there are more tpch like queries. As I couldn't find a great answer to cover all cases, I propose two different algorithms (via GUC)


if (THROTTLING_POLICY == GREEDY_POLICY)
{
      /* just do nothing, let the backend get as many connections as it wants */
      /* the connection management would prevent establishing more connections than necessary */           
} 
else if (THROTTLING_POLICY == FAIR_POLICY)
{
    int activeBackendCount = GetActiveBackendCount();
    int perBackendAllowedConnectionToWorker = ceil(citus.max_shared_pool_size/activeBackendCount)

    /* each backend gets at most perBackendAllowedConnectionToWorker */
    workerPool->targetPoolSize =  Min(MaxAdaptiveExecutorPoolSize, perBackendAllowedConnectionToWorker);

         /* adjust if the calculated newConnectionCount is larger than we're allowed */
    newConnectionCount = workerPool->targetPoolSize - initiatedConnectionCount;
}

Misc. Changes

Add citus.max_shared_pool_size GUC

Add at_exit callback

We can track connection close events for CloseConnection() or ShutdownConnection() etc. However, we're not properly closing the cached connections when the session on the coordinator is terminated. We also see could not receive data from client: Connection reset by peer on the workers when the coordinator disconnects. Instead, we should try to properly close those connections, and decrement the shared connection counter.

Task-tracker queries

Task-tracker queries lead to connection establishments between worker nodes. Our algorithm doesn't take that into account. Should we somehow integrate them to the algorithm?

We can introduce a new API in the connection manager, which reserves some number of connections so that other backends see less avaliable connections to that worker node.

We should be cautious about query failures.

Test Scenarios

onderkalaci commented 4 years ago

Some edge cases to consider after talking to @marcocitus :

onderkalaci commented 4 years ago

Some other edge cases:

onderkalaci commented 4 years ago

implemented by #3692. Though, instead of adjusting the pool size, we've introduced OPTIONAL_CONNECTION.

We may still consider adding THROTTLING_POLICY in the future, but that should be a separate issue.