Closed petermattis closed 6 years ago
This is kind of related to the existing parallelism work (cc @nvanbenschoten) and the use of streaming interfaces in the sql layer (#7775, cc @tristan-ohlson)
We already parallelize SQL queries that have no results (RETURNING NOTHING
). The insight here is that even an update with results can be split into two parts, and the result (the count of rows affected) is available after the first part (while the second part has higher latency). We could accomplish this by modeling an UPDATE as two separate operations from the perspective of the ParallelizeQueue
, or perhaps by streaming the results back from a single operation and allowing the session to proceed once the result is sent.
@nvanbenschoten Can you describe your recent experiments in this area? They seem very promising and something we should investigate doing for 2.1.
Pipelining transactional writes between SQL statements and batching transactional writes between SQL statements are two alternatives that allow us to achieve a more general goal - lifting consensus out of the synchronous path for SQL writes. To understand why this is important, let's first make the assumption that a write is always more expensive than a read, usually by orders of magnitude. We'll also assume that a SQL gateway is always collocated in the same datacenter as the leaseholder of the data it is trying to access [1]. This supports the first assumption because it means that reads can be served without inter-dc communication, while writes do require inter-dc coordination.
The key insight (which is discussed above) is that in order to satisfy the contract for a SQL write (UPDATE
, INSERT
, DELETE
, etc.), we only need to synchronously perform SQL-domain constraint validation to determine whether the write will be allowed and determine what the effect of the write will be if the validation succeeds (i.e. rows affected). We don't actually need to perform the write. In fact, the process of performing the write can be delayed indefinitely. The only two requirements for the write are that:
To convince oneself that this is true, it's useful to distinguish SQL-domain errors from KV-domain errors. A SQL-domain error is one which is expected by SQL when a constraint is violated by an operation. Examples of these are unique constraint violations, referential integrity constraint violations, and check constraint violations. SQL mandates that these are detected and returned to the SQL client when statements are issued. A statement cannot succeed if these constraints are violated. A KV-domain error is one which arrises due to a failure "beneath" SQL and is not expected by SQL. Examples of these errors are network failures that prevent writes from succeeding, transaction retry errors, and disk corruption errors. This class of error must prevent a transaction from committing if they prevent all transaction effects from going into effect, but they are not bound to a specific SQL statement. In fact, as long as neither of the requiremnts listed above are broken, they can always be delayed until the end of a transaction and returned as the result of a COMMIT
statement.
So, with this insight in mind, we can begin rethinking how a SQL-level write is processed in Cockroach. Instead of a SQL-level write resulting in a synchronous KV-level write, we can instead break it up into a synchronous "dry-run" KV-read operation and an asynchronous KV-write operation. The read operation will perform all constraint validation and determine what the effect of the write will be but importatly will not actually perform the KV-level write. The KV-write will then be performed at some later time, with the only constraint that it happens before the transaction is committed.
This restructuring is a huge improvement for transactional latency because it means that each SQL-write operation will only need to wait for a KV-read instead of a KV-write, which we assumed above will be much faster. The individual latencies required for each KV-write can then be combined into the latency of only a single consensus write, either by performing the writes concurrenctly or by batching them together [2].
The main complication of this restructuring is that we need to enforce the second requirement for transactional writes - that all future operations observe their effect. This is where the main difference between the pipelining proposal and the deferred batch proposal comes into effect.
In the pipelining proposal, the dry-run KV-read and the KV-write are sent out at the same time by a transaction coordinator. The coordinator waits for the KV-read to return before returning to the SQL client but does not wait for the KV-write. Future reads or writes will see the effect of the first KV-write because the proposal makes the assumption that the KV protocol is streaming and that all KV-steams are strongly ordered such that any request necessarily observes the full result of all previous requests sent on that stream. With this property, it follows that any future read or write in the transaction will be ordered behind the asynchronous write and therefore observe its effect. However, it remains unclear how difficult it would be to introduce the properties necessary for this approach into the KV protocol. This also means that reading immediately after a write forces the next read to wait for consensus of the previous KV-write, which undermines the benefit to some degree.
In the deferred write batch proposal, the dry-run KV-read is sent out immediately by the transaction coordinator but the KV-write is not. Instead, the KV-write is deferred until strictly necessary. "strictly necessary" can mean different things. In the current prototype of this proposal, a write is deferred until a future read or write overlaps the same keyspace or until a commit is issued, at which point the deferred write is added to the front of the new batch and flushed from the deferred write set. This could be improved. For instance, it should be possible for future reads (or "dry-run" reads) to read directly from the deferred write set without flushing it. This will transparently improve foreign key validation and help with #15157 because it means that for a transaction that writes into two tables who have an FK constraint, the FK validation for the second write could be fielded directly from the deferred write batch. Another benefit of this change is that it naturally creates larger write batches, which reduces the amount of network traffic and disk writes. It also allows us to hit the 1PC fast path more often.
cc. @tschottdorf @andy-kimball @spencerkimball
[1] these assumptions do not always hold, but because they're so important for high performance in CockroachDB, it's appropriate to optimize for them.
[2] at the moment, our transactional model does not allow transaction commits to be in-flight concurrently with other writes for the same transaction on other ranges. This means that transactions that span ranges must always perform at least two serialized consensus writes. This restriction could be lifted, but that is an orthogonal concern.
Excellent write-up! I think this can be a big performance win for 2.1.
So, with this insight in mind, we can begin rethinking how a SQL-level write is processed in Cockroach. Instead of a SQL-level write resulting in a synchronous KV-level write, we can instead break it up into a synchronous "dry-run" KV-read operation and an asynchronous KV-write operation.
Most SQL write operations are conditional puts which are implemented internally as a read-and-compare followed by a write. Separating the read and write phases of the transaction makes a lot of sense for multi-statement transactions, but that extra round-trip will be a performance hit for simple transactions that insert using a single statement. I haven't looked at your prototype, so perhaps you're already taking this into consideration.
I'd add the mild complications around the parallel DistSQL machinery, we need to figure out the key spans touched by a DistSQL plan and then flush the overlapping key ranges for in-flight writes (or send the writes along with DistSQL, but then the act of waiting for them becomes an awkward poll, so it's probably not worth even considering).
Also (and this is probably implicit anyway) we don't have to defer the writes all the way to the end, we can also send batches out as we see fit. For example, a transaction that performs lots of writes would have chunks flushed out by the client.Txn
every so often.
We can also satisfy reads from the in-flight write set in some situations, though I'm not sure it's useful enough to introduce the complexity for.
@petermattis:
Separating the read and write phases of the transaction makes a lot of sense for multi-statement transactions, but that extra round-trip will be a performance hit for simple transactions that insert using a single statement.
We should play through this with some examples, but if the commit/release is passed in a batch, it would make sense to disable dry running for it. Hopefully that covers most of these cases (and if it doesn't, it seems that SQL should produce better batches).
Most SQL write operations are conditional puts which are implemented internally as a read-and-compare followed by a write. Separating the read and write phases of the transaction makes a lot of sense for multi-statement transactions, but that extra round-trip will be a performance hit for simple transactions that insert using a single statement.
KV writes have multiple internal phases: they are first evaluated on the leaseholder, then submitted to raft and applied on all replicas. We currently send the response to the gateway after the command has been applied, but the response is actually fully determined after the evaluation phase (modulo re-evaluations and re-proposals). We could add a KV option to return the response as soon as evaluation completes, along with a token that can be used to check later (just before the commit, analogous with RefreshSpans) whether the request eventually applied. (Instead of this token, a streaming response protocol would allow us to do this as two responses to the same request).
We could add a KV option to return the response as soon as evaluation completes, along with a token that can be used to check later (just before the commit, analogous with RefreshSpans) whether the request eventually applied.
This has the nice advantage that the txn reads its own writes for free, but the disadvantage that a failed write isn't noticed until the end of the txn which might result in confusing error messages.
Separating the read and write phases of the transaction makes a lot of sense for multi-statement transactions, but that extra round-trip will be a performance hit for simple transactions that insert using a single statement. I haven't looked at your prototype, so perhaps you're already taking this into consideration.
The prototype doesn't separate the read and the write phase of a kv-write if a later operation in the same batch depends on the write. So for simple transactions that send a commit in the same batch as a write, the write will not be split and we will still achieve a 1PC txn. For multi-statement txns, the read phase and the write phase will be split up, which can actually help us achieve a 1PC txn in more cases.
That said, there still are some workloads that would perform relatively poorly with this change. For instance, a series of alternating writes and reads that overlap would result in each write being immediately flushed, making the read-phase of a kv-write essentially useless added latency. The optimization to allow for reading directly from the deferred write batch would be a big help here as it would mean that the deferred writes don't all need to be flushed immediately after the next read comes in.
I'd add the mild complications around the parallel DistSQL machinery, we need to figure out the key spans touched by a DistSQL plan and then flush the overlapping key ranges for in-flight writes
The prototype takes the approach of just flushing the entire deferred write batch before allowing DistSQL reads. At a minimum, we should be smarter about only flushing exactly what the DistSQL flow plans on reading. I'm not sure if we know this all beforehand on the SQL gateway, but I expect we have a pretty good idea.
We can also satisfy reads from the in-flight write set in some situations, though I'm not sure it's useful enough to introduce the complexity for.
Yes, this would be a very important optimization for two reasons:
Both of these reasons are very important when it comes to tables with FK relations to one another. Writing to a parent table and a child table that references the parent table in the same txn is very common. It would be a shame if FK validation when writing to the second table forced us to flush the deferred write batch. On the other hand, if we could read from our own deferred write batch then we could actually improve FK validation in this situation by avoiding communication with the leaseholder altogether!
We should play through this with some examples, but if the commit/release is passed in a batch, it would make sense to disable dry running for it.
Exactly, this is how the prototype works.
KV writes have multiple internal phases: they are first evaluated on the leaseholder, then submitted to raft and applied on all replicas. We currently send the response to the gateway after the command has been applied, but the response is actually fully determined after the evaluation phase (modulo re-evaluations and re-proposals). We could add a KV option to return the response as soon as evaluation completes, along with a token that can be used to check later (just before the commit, analogous with RefreshSpans) whether the request eventually applied. (Instead of this token, a streaming response protocol would allow us to do this as two responses to the same request).
This is a good alternative proposal and one I've also been thinking about a lot. I think the biggest benefit of this is that it wouldn't cause us to delay the writing of intents until late in the transaction. As is, the deferred write proposal has the potential to dramatically alter contended workload behavior because transactions won't begin writing intents until very late. In practice, I'm not actually sure how this would play out. On one hand, waiting until late in the transaction to lie down intents could essentially undermine the RefreshSpans
optimization and could cause more txn restarts because results will have been returned to clients in almost all cases before conflicting intents are discovered. On the other hand, with the PushTxnQueue
blocking reads on conflicting writes, performing all reads immediately and quickly before then doing all writes could actually reduce the number of txn retries. It's tricky to reason about how these different characteristics will play out.
Another benefit to this alternative is that it pipelines the execution of the write consensus with the rest of the txn instead of deferring that cost until the end. This means that it should never hurt performance compared to what we see today. The flipside is that it pays for this by giving up any potential batching effect that the first proposal could achieve, which had the potential of reducing disk writes, reducing Raft overhead, and increasing 1PC txn opportunities.
I would disagree with the idea that This has the nice advantage that the txn reads its own writes for free
is a benefit of this alternative. In practice, I think we'd actually see that and reads observing earlier writes in the same Txn would end up stuck in the CommandQueue and would be forced to eat any latency saved on the previous write. This is also true of the first proposal, which naively would need to flush any deferred writes that are later read. This is why reading directly from the deferred write batch when possible is so important.
I've been thinking more about @bdarnell's alternative because there is a lot I like about it. My one hesitation is the idea of a token and all the extra state on a replica that it would require us to hold. A recent discussion with @tschottdorf about idempotency and intent resolution got me thinking in this area and I now don't think we need any of this extra state at all. Any future request that needs to check for the success of a proposal only needs to check that an intent exists with an expected sequence number at an expected key for it to know if the request succeeded or not.
With this in mind, we could create a CheckIntent
rpc. This read-only request would have the simple job of going to a key and checking that an intent exists on the key for a specified TxnID and Sequence number. We could then introduce another stage in the txn coordinator pipeline, which for now we can assume exists as a new Sender
between client.Txn
and TxnCoordSender
(although if history tells us anything, a new Sender
might not be a great idea). This Sender
would catch batches composed of single intent writes (Put
, InitPut
, CPut
, Delete
, etc.) and set a RespondAfterEval
flag on their header. This flag would tell the receiving replica to respond as soon as the batch has been proposed and not to wait for its proposal to complete. When it received a response from this batch, it would place the requests in an OutstandingProposals
tree, similar to the one in the current prototype.
However, unlike the current proposal, a future request in the same txn that overlaps the OutstandingProposals
tree wouldn't flush the write. Instead, it would require a CheckIntent
request be prepended to the overlapping batch. Once this batch with the CheckIntent
request succeeds, we can then remove from the OutstandingProposals
tree. Again, if the batch also wrote some keys in RespondAfterEval
mode, we would add them to the OutstandingProposals
.
CheckIntent
requests are usually extremely cheap because anyone who needs to send them is already sending an overlapping request to the same range. However, one wrinkle is that DistSender currently splits up read-only and read-write requests. For this to achieve peak performance, we'd want these CheckIntent
requests to be sent with either reads or writes so that they never require an extra rpc.
The only case where this logic doesn't hold up is CommitTxnRequest
requests, which will need to flush all CheckIntent
reqs, even to ranges that it wasn't itself heading to. We could consider resolving these outstanding proposals asynchronously through some kind of best-effort background process. Ideally, this would reduce the number of CheckIntent
rpcs we need to send before (or while, @tschottdorf 😉) sending the CommitTxnRequest
. This isn't required by the original design though.
Just like with the original prototype here, we would want to be able to read from the OutstandingProposals
tree. The primary consumer of this would be FK validation, so as long as we supported reading-our-own-writes for point lookups without being forced to send CheckIntent
requests, I think we'd be ok.
The major roadblock I see here is re-evaluations. It's my understanding that these take the request out of the CommandQueue, which would be an issue because it would mean that CheckIntent
rpcs could be re-ordered with request they're trying to observe. This brings up the question of what to do when a CheckIntent
rpc does not find an intent where it expects to. Naively, we could just abort the txn, but we can do better. If all request types that use the CheckIntent
infrastructure are idempotent then we could just issue them again ourselves when we see a CheckIntent
that fails.
That's a good idea!
How does this work with DistSQL? Waiting to flush as before, or sending the tree along so it can inject the right CheckIntents
as well?
When it received a response from this batch, it would place the requests in an OutstandingProposals tree, similar to the one in the current prototype.
You know this, but just to clarify the writeup: once it's in the tree, you can handle the next client command. Btw, are there case in which you wouldn't even have to wait for the evaluation to conclude? I.e. a kind of RETURNING NOTHING
optimization in this case.
Perhaps instead of injecting CheckIntent
RPCs, we could add a CheckSeq
field into RequestHeader
for the overlapping requests. That's less futzing around with the batch as a whole, but maybe it's annoying in other ways (for example, you'd perform the check multiple times when multiple requests overlapped, so you have special code to handle that).
(although if history tells us anything, a new Sender might not be a great idea).
👍 👍 👍 let's put in the elbow grease to make this all transparent and architecturally sound. I like how well confined this is so far, btw!
However, one wrinkle is that DistSender currently splits up read-only and read-write requests. For this to achieve peak performance, we'd want these CheckIntent requests to be sent with either reads or writes so that they never require an extra rpc.
There's actually an interesting observation here. In https://github.com/cockroachdb/cockroach/issues/23942, you point out that we sometimes propose no-op writes to Raft. That's just saying that we have commands that are often writes but sometimes act as reads, and yet we send them through Raft! Thinking this through, we should understand a read as a no-op write. If we get the overhead for that down enough, we have basically succeeded in removing the read/write distinction at the replica level (and think of all the flags we could delete 😉). Besides, conceptually this fits in well with predicate push down and transformations (aggregations etc) which really will do more than just "read" -- they'll transform the data.
If all request types that use the CheckIntent infrastructure are idempotent then we could just issue them again ourselves when we see a CheckIntent that fails.
I've been thinking more about @bdarnell's alternative because there is a lot I like about it. My one hesitation is the idea of a token and all the extra state on a replica that it would require us to hold.
With a streaming KV API, we could use the stream itself in place of an explicit token. There wouldn't be any more state here than in the current implementation that holds the KV request open across the entire commit+apply time. We might still need the token (or the suggested CheckIntent
) as a fallback for if the connection closes, or maybe we'd just treat that as an RPC error and restart the transaction.
Thinking this through, we should understand a read as a no-op write. If we get the overhead for that down enough, we have basically succeeded in removing the read/write distinction at the replica level
We'd still have some concept of read-only vs read/write at the replica level (it's important for the command queue), but I think you're right that we could get rid of the per-method flags and use the "write" path for everything, and bail out if it turns out to be a no-op.
How does this work with DistSQL? Waiting to flush as before, or sending the tree along so it can inject the right CheckIntents as well?
Sending the tree along seems possible, although to start it's probably easiest to just flush before launching distributed flows. In this new proposal, "flush" means that we resolve all outstanding writes by sending out a batch consisting of a CheckIntent
request for each write that hasn't been verified.
Btw, are there case in which you wouldn't even have to wait for the evaluation to conclude? I.e. a kind of RETURNING NOTHING optimization in this case.
If we had strongly ordered streams between gateways and ranges then probably, but I think the idea is to get away from RETURNING NOTHING
and any parallel in-flight SQL statements. In fact, my long-term plan here is to stub out RETURNING NOTHING
completely once the rest of this change makes it unnecessary.
Perhaps instead of injecting CheckIntent RPCs, we could add a CheckSeq field into RequestHeader for the overlapping requests.
The reason why I'm convinced that a discrete RPC is the best approach is that there are cases when we'll want to perform this CheckIntent
validation without doing anything else. For instance, before (or while, see below) sending out an EndTransactionRequest, we'll need to validate all outstanding writes. In this case, we have no other reason to visit these keys other than for the CheckIntent
validation, so it seems messy to need to fit this on top of a synthetic GetRequest
or something.
It's also convenient that the new RPC can be shared between this proposal and #24194.
Thinking this through, we should understand a read as a no-op write. If we get the overhead for that down enough, we have basically succeeded in removing the read/write distinction at the replica level
Conceptually I think you're right, but there are practical differences like how the commands are treated in the CommandQueue and the kind of engine.Batch
we want to use for evaluating them.
With a streaming KV API, we could use the stream itself in place of an explicit token. There wouldn't be any more state here than in the current implementation that holds the KV request open across the entire commit+apply time. We might still need the token (or the suggested CheckIntent) as a fallback for if the connection closes, or maybe we'd just treat that as an RPC error and restart the transaction.
I agree with everything you're saying here, but I don't want this idea to broaden the scope of this proposal to the point where it becomes predicated on switching to a streaming KV API.
That said, I do think it's interesting to consider how the two proposals interact and what we can build now that can later be improved by a streaming KV API if we ever more in that direction. Most of the structures proposed in my most recent comment would be necessary regardless of the KV API. The only difference would be "proposal validation". As you pointed out, a streaming KV API could allow us to validate proposal completion without any necessary PRCs. Meanwhile, without this streaming KV API we'll need to introduce a CheckIntent
request. So it seems to me that the current proposal doesn't need a streaming KV API, but that one would allow for improved performance if it came along. If that's the case then I don't think we should block this on a streaming KV API, but we should keep the possibility of one in mind when building out this idea.
24194 🔍
I'd like to make it explicit how these two proposals fit together with one another. The interesting part where they interact is when a transaction has outstanding writes at the point of a commit. In that case, we need to make sure that the writes succeed before committing the transaction. However, just like with writes in the same batch as an EndTransactionRequest, we don't want to have to run this step before sending off the EndTransactionRequest. Using the same trick as we use in #24194, this should be possible! To achieve this, we'll need to treat the outstanding writes the same as we do with parallel writes by adding them to the "promised write set". We'll then need to add a CheckIntent
request for all outstanding writes to the final batch and wait for all to succeed before moving the transaction record from STAGED
to COMMITTED
. Easy!
In fact, my long-term plan here is to stub out RETURNING NOTHING completely once the rest of this change makes it unnecessary.
This doesn't completely eliminate the usefulness of RETURNING NOTHING. An INSERT today requires a round trip to the leaseholder(s) (plural if there are secondary indexes) followed by a round of consensus. INSERT RETURNING NOTHING defers/parallelizes both of those steps. With pipelining post-evaluation, the consensus is parallelized but you still have to talk to the leaseholder(s) to discover whether there is a conflict.
Partitioning and follow the workload can help increase the odds that you have a local leaseholder, but sometimes you'll still have to go remote and RETURNING NOTHING can still help in those cases. (For example, consider a partitioned users table with a global index on the email column. The global index will likely have a remote leaseholder). Maybe it's not worth maintaining at that point given its low adoption and reduced impact, but it's not completely unnecessary.
Conceptually I think you're right, but there are practical differences like how the commands are treated in the CommandQueue and the kind of engine.Batch we want to use for evaluating them.
The command queue doesn't know about the read or write flags on the RPC methods. It only knows about the declared key accesses (write commands are read-only on many of the keys they touch). OTOH, the engine.Batch issue is a tricker one to resolve, especially if we start using rocksdb snapshots instead of raw engine access for reads. (This would allow us to remove the read from the command queue and update the tscache as soon as we've grabbed the snapshot instead of waiting for evaluation to complete).
So it seems to me that the current proposal doesn't need a streaming KV API, but that one would allow for improved performance if it came along. If that's the case then I don't think we should block this on a streaming KV API, but we should keep the possibility of one in mind when building out this idea.
Agreed.
I'd like to make it explicit how these two proposals fit together with one another. The interesting part where they interact is when a transaction has outstanding writes at the point of a commit. In that case, we need to make sure that the writes succeed before committing the transaction. However, just like with writes in the same batch as an EndTransactionRequest, we don't want to have to run this step before sending off the EndTransactionRequest. Using the same trick as we use in #24194, this should be possible! To achieve this, we'll need to treat the outstanding writes the same as we do with parallel writes by adding them to the "promised write set". We'll then need to add a CheckIntent request for all outstanding writes to the final batch and wait for all to succeed before moving the transaction record from STAGED to COMMITTED. Easy!
I think we can avoid exposing the STAGED
status above DistSender
. Essentially all you have to tell DistSender for the final batch is that there are additional promised writes (already in flight) and then you let it check and optimize accordingly. This happens kind of naturally if you prepend CheckIntent
requests to the batch and use the existing Sender
interface to DistSender
(though it does need some special casing). DistSender
doesn't need any special casing; it'll execute the CheckIntents
in parallel with the rest and add the keys to the promised write set. Only if the CheckIntents
actually come back successfully does it commit.
Consider an application which executes the following statements one-by-one (yes, this could be done more efficiently, but an ORM is likely to produce SQL like this):
This transaction is moving $1 from account
11
to account10
. Our current SQL implementation imposes unnecessary latency on theUPDATE
operations. Internally, eachUPDATE
is a select for the matching rows followed by a consistent (replicated) write of the new balance. After the write completes we return the number of rows updated. We suffer the latency of the consistent write even though lower layers of the system will serialize access to a particular key. Translating into KV operations this looks like:Notice that we know the number of rows that will be updated after the Scan operation completes. We can return to the client at that point and asynchronously send the Put. When the next statement arrives, we don't have to wait for any pending mutations as the KV layer will serialize the operations [*] for a particular key. We would have to wait for the outstanding mutations to complete before performing the commit. The above would become:
There is a similar win to be had for
DELETE
operations which involve a Scan followed by a DelRange.INSERT
is somewhat more complicated as it is translated into aConditionalPut
operation which is internally a read (on the leader) followed by a write. We could potentially return from theConditionalPut
operation as soon as the write is proposed, but we'd have to leave the operation in the CommandQueue until the write is applied. There are likely lots of dragons here, though perhaps the approach of allowing a transactional write operation to return as soon as it is proposed could handle all of the cases here. TheTxnCoordSender
would then have to have a facility for waiting for the transactional write to be applied before allowing the transaction to be committed.[*] While a replica will serialize operations for a particular key, multiple operations sent via separate
DistSenders
will not. We'd need to make sure that the operations sent for a transaction are somehow pipelined within theDistSender/TxnCoordSender
. We wouldn't want a Scan operation to get reordered in front of a Put operation.Cc @tschottdorf, @bdarnell