This is issue is just the high-level overview issue. Individual sub-tasks have/will get their own tickets. Please edit the top-level description to integrate agreed upon changes (and just make plain correction/refinements), so the overall design is visible in one-place.
A number of issues like:
1131 - UPDATE on multiple shards natively works
1130 - DELETE on multiple shards natively works
455 - Add support for multi-row INSERT statements
1328 Expand transaction coverage for queries that touch reference and distributed tables
please-expand
are essentially blocked because we currently have neither detection nor handling of distributed deadlocks. So far that's mostly handled by prohibiting commands / combinations of commands that could trigger deadlocks (causing usability issues) or by making them acquire more heavyweight locks than strictly necessary (causing concurrency / practicability issues). There's several ways that such deadlocks can still be triggered, however.
We're planning to implement two approaches to handle deadlocks:
Detect distributed deadlocks, and abort them. This requires communication between nodes, and will be primarily important for deadlocks triggered by in-fortune sequencing of actions by the user (e.g. separate UPDATE statements within a transaction, that are not well ordered). This'll trigger after a configurable timeout, and will usually detect deadlocks that already lasted several seconds.
Implement Wound/Wait for commands that opt into it. Wound-wait, very much simplified, will cancel a "younger" transaction if an "older" requires a resource. That allows for very fast (heuristic) deadlock detection, guarantees global progress, and is fair. Particularly useful for multi-row commands that can reasonably expected to occasionally experience deadlocks. Depending on the command and the state in which it is sent (first in xact), the canceled command can transparently be retried. By retaining the original transaction id, there's a guarantee of eventual progress in cases where retries are possible.
Subtasks of this are:
Make everything in Citus cancelable, otherwise we'll not be able to handle deadlocks by aborting (#1100, #1121)
Code that sends multiple commands over various connections in parallel, needs to be properly non-blocking (rather than as currently often done, be implemented by asynchronously sending commands to all, then synchronously waiting for results 1-by-1). Otherwise additional and unnecessary deadlocks can be introduced. There's also a good chunk in performance benefit in doing so.
Introduce dynamically registered background-worker for each database that actually uses Citus. For that we'll have to use RegisterDynamicBackgroundWorker() from code that's executed in all Citus databases, e.g. close to CitusHasBeenLoaded(). A shared memory region will contain information about whether a background worker has already been started, and a lock protecting that information. That background worker will be used to implement wound/wait (on the worker, frequently), and distributed deadlock detection (on coordinators, not quite as frequently). It has to be dynamically registered, to avoid having to configure a list of databases with citus installed in the configuration file.
Introduce concept of distributed transaction. Distributed transactions will be identified by a tuple like (timestamp, node-identifier, local-transaction-counter). These identifiers will be used when building wait-for graphs for deadlock detection, and to establish the 'older' transaction in the wound/wait scheme. We'll have to send these identifiers with every BEGIN a coordinator sends to workers. Some care is required to handle command without transactions (CREATE INDEX CONCURRENTLY, plain INSERT or SELECT) correctly on installations using pgbouncer.
Perform distributed deadlock checking from coordinators, on a regular interval. The background worker introduced previously will regularly ping every worker, checking if there's any processes blocked on a lock for longer than small_factor * deadlock_timeout. If so it'll, on each worker, call a function that returns the local lock-graph in a usable manner (including the distributed-transaction-id from above). On the coordinator these lock-graphs will be combined to a global lock-graph. At first a simple depth-first search will be used to detect cycles. If a cycle is found the youngest transactions will be aborted (by sending SELECT pg_cancel_backend($pid)) until the deadlock is resolved.
Implement wound/wait to quickly resolve deadlocks. This will likely be an opt-in feature for individual types of commands, e.g. a multi-shard UPDATE. Such commands would send something like SELECT enable_wound_wait(); UPDATE ...; SELECT disable_wound_wait(); to workers.
Implement transparent retries for wound/wait, where possible, using savepoints around restartable sections. Likely this'll initially only be possible for the first command in a transaction, as the handling of locks acquired for previous commands in the same transaction will end up being complicated.
@marcocitus @ozgune @sumedhpathak This is a draft design from what I got out of the deadlock discussion Marco and I had. This is my POV and I'm tired, so this is bound to not be entirely accurate.
This is issue is just the high-level overview issue. Individual sub-tasks have/will get their own tickets. Please edit the top-level description to integrate agreed upon changes (and just make plain correction/refinements), so the overall design is visible in one-place.
A number of issues like:
1131 - UPDATE on multiple shards natively works
1130 - DELETE on multiple shards natively works
455 - Add support for multi-row INSERT statements
1328 Expand transaction coverage for queries that touch reference and distributed tables
please-expand
are essentially blocked because we currently have neither detection nor handling of distributed deadlocks. So far that's mostly handled by prohibiting commands / combinations of commands that could trigger deadlocks (causing usability issues) or by making them acquire more heavyweight locks than strictly necessary (causing concurrency / practicability issues). There's several ways that such deadlocks can still be triggered, however.
[ INSERT DISTRIBUTED DEADLOCK REFERENCE/EXPLANATION?]
We're planning to implement two approaches to handle deadlocks:
Subtasks of this are:
RegisterDynamicBackgroundWorker()
from code that's executed in all Citus databases, e.g. close toCitusHasBeenLoaded()
. A shared memory region will contain information about whether a background worker has already been started, and a lock protecting that information. That background worker will be used to implement wound/wait (on the worker, frequently), and distributed deadlock detection (on coordinators, not quite as frequently). It has to be dynamically registered, to avoid having to configure a list of databases with citus installed in the configuration file.BEGIN
a coordinator sends to workers. Some care is required to handle command without transactions (CREATE INDEX CONCURRENTLY
, plainINSERT
orSELECT
) correctly on installations using pgbouncer.small_factor * deadlock_timeout
. If so it'll, on each worker, call a function that returns the local lock-graph in a usable manner (including the distributed-transaction-id from above). On the coordinator these lock-graphs will be combined to a global lock-graph. At first a simple depth-first search will be used to detect cycles. If a cycle is found the youngest transactions will be aborted (by sendingSELECT pg_cancel_backend($pid)
) until the deadlock is resolved.SELECT enable_wound_wait(); UPDATE ...; SELECT disable_wound_wait();
to workers.