Open tavplubix opened 3 years ago
I am somewhat confused about which operations we support. The basic block is a data part, that is immutable. The table consists of a set of data parts. There are many tables. The following operations on the table are possible:
Data part is added.
Several data parts are merged into one: source parts are deleted and the resulting part is added.
Data part is removed.
Data part is altered by e.g. adding a new column. This seems to conflict with 'immutability', but we do it by dropping the old part and creating a new one, so it's technically immutable.
A transaction consists of several of these operations on same or different tables. We want to ensure that a concurrent transaction either sees all the effects of a transaction, or sees none of them (atomicity). None of the effects of the transaction should be seen until it commits (isolation).
'insert' never conflicts with anything, but 'merge', 'drop' and 'alter' can conflict.
(1) insert | (2) merge | (3) drop | (4) alter | |
---|---|---|---|---|
(1) insert | no conflict | no conflict | no conflict | no conflict |
(2) merge | * | source parts | source parts | source parts |
(3) drop | * | * | trivial | source parts |
(4) alter | * | * | * | source parts |
Implementing atomicity + isolation for operation set of 'insert' and 'merge' is trivial. Many inserts and one merge can't conflict, and we avoid the conflict between merges because they are scheduled in a special way by the server itself. But we have to support 'drop' and 'alter' as well.
An obvious solution for conflicts is to acquire an exclusive lock on the participating table for 'drop' and 'alter' operations, and shared lock for 'insert' and 'merge'. We have to wait when locking exclusively, or the 'drop' and 'alter' operations won't be able to make progress under load. If a transaction can alter more than one table, this means we need a deadlock detector, because we can't control the order of operations in user code. So we want to simplify it even more. Let's allow only one drop / alter operation per transaction. This way we can think about the locking later, and can already implement and debug the MVCC snapshot part. Exclusive lock for ALTERs will be a regression, because we made some effort before to make it not exclusive, but for a prototype it is OK.
A proper implementation for concurrent 'drop' and 'alter' will lock the participating parts by setting their maxtid
to the current transaction. An attempt to lock an already locked data part immediately leads to the transaction failure (or else we would have to detect deadlocks). We can't fail immediately if we're locking the entire table, but we can do it for a single data part, because the lock contention is going to be much lower, so we'll probably be able to make progress.
In a distributed setting, the transactional metadata for parts is in Zookeeper, right? So the locking should go through ZK as well. I think we should not have a persistent copy of these metadata locally, because syncing would be a pain.
I'm wondering how we can test all this... This algorithm is probably full of holes, and writing a full implementation and realizing it's completely broken might be too expensive. Maybe we can write a toy prototype and test it with something like Jepsen, or even write a formal spec...
Implementing atomicity + isolation for operation set of 'insert' and 'merge' is trivial. Many inserts and one merge can't conflict, and we avoid the conflict between merges because they are scheduled in a special way by the server itself. But we have to support 'drop' and 'alter' as well.
Yes, I wrote almost the same thing here:
As for writing queries, Snapshot Isolation requires to detect conflicts and rollback a transaction if it tries to modify an object which was modified by concurrent transaction. It's not a problem for concurrent INSERT queries, because they just append new data to table by creating new data parts, so two INSERTs cannot modify single object and cannot cause write-write conflict (but there is a nuance with Replacing, Collapsing and other special *MergeTree engines). To support transactional ALTERs it should be enough to forbid concurrent merging/mutating operations with overlapping source parts sets and do not assign such operations on future parts (because creation of future part can be rolled back).
A proper implementation for concurrent 'drop' and 'alter' will lock the participating parts by setting their maxtid to the current transaction. An attempt to lock an already locked data part immediately leads to the transaction failure
Seems like it's exactly what I propose:
We can use maxtid to detect such conflicts. Query writes its tid into maxtid before making any changes. If maxtid is not NULL, i.e. other transaction is currently tries to remove the part, query throws Serialization error.
(or else we would have to detect deadlocks)
AFAIK deadlocks are possible on Read Committed isolation level when we try to avoid serialization errors by waiting for conflicting transaction to be committed and taking new snapshot for our statement. Maybe we will implement it someday, but it seems to be too hard for now (we still have issues with deadlock detector in RWLock
, and I don't want to add another one :D). So I propose only Repeatable Read for the first implementation, and conflicting transactions will immediately fail without any locks.
Am I missing something?
In a distributed setting, the transactional metadata for parts is in Zookeeper, right? So the locking should go through ZK as well. I think we should not have a persistent copy of these metadata locally, because syncing would be a pain.
Yes
To make snapshots and part visibility work with replication, we have to add tid into ReplicatedMergeTreeLogEntryData
Data parts in ReplicatedMergeTree
are created through replication log. If we have tid
s in log entries then we know transactional metadata for parts produced by log entries. But there is a nuance of logs rotation. We have to store transactional metadata of old parts somewhere.
I'm wondering how we can test all this...
Maybe we can write something like stress tests which are trying to reproduce known anomalies.
AFAIK deadlocks are possible on Read Committed isolation level when we try to avoid serialization errors by waiting for conflicting transaction to be committed and taking new snapshot for out statement. Maybe we will implement it someday, but it seems to be too hard for now (we still have issues with deadlock detector in RWLock, and I don't want to add another one :D). So I propose only Repeatable Read for the first implementation, and conflicting transactions will immediately fail without any locks.
Am I missing something?
Suppose you have two concurrent transactions:
(1)
begin transaction;
alter table t1 update .... -- locks t1.part1
alter table t2 update ..... -- locks t2.part2
commit transaction;
(2)
begin transaction;
alter table t2 update ... -- locks t2.part2
alter table t1 update ... -- locks t1.part1
commit transaction;
This can deadlock. I think we can't release exclusive locks until the transaction ends, or some modifications will be lost (this is "strict 2PL"). So we have to deal with conflicting locks. We can add a (zero) timeout on locks, or add a deadlock detector. The starvation is probably not a big problem in case of part locks, so timeout will work OK.
Maybe we can write something like stress tests which are trying to reproduce known anomalies.
I mean, does it make sense to build a toy model of the protocol first, not the entire implementation on top of ClickHouse?
By the way, postgres has a nice tool for specifying and running transaction histories: https://github.com/postgres/postgres/blob/master/src/test/isolation/README The tests looks like this: https://github.com/postgres/postgres/blob/master/src/test/isolation/specs/deadlock-simple.spec
To be honest, I didn't think too much of transactions outside of INSERT usecase (there seem to be a lot of pitfalls), for me the main use case is the atomicity of inserts. Some additional guarantees for reading (except "do not read what is not commited") seem to be less in demand. Therefore, I can have a one-sided look at the task (maybe some questions will look a bit naive due to that).
Questions
Another interesting question that sometimes comes up is the atomicity / consistency of operations like FREEZE or reloading dictionaries. But this is probably out of scope for now.
After that it writes the corresponding mincsn/maxcsn into data part
same question - how to store it, w.o. making part mutable
If same data part is in Commited or Outdated state i .... change state of created parts ...
You mention that state few times - do you mean the state of a part object in the RAM, or it will be a marker in the filesystem?
It's the first reason why do we need separate transaction ids and CSNs with mapping between them. We can use only tids to define parts visibility, but in this case we have to save a list of concurrent transactions when taking a snapshot and check if each of them is already committed or not to determine part visibility.
Yep, I think I understand now. CSN can also be an incremental number of transactions in a transaction log. And you want to avoid lookups for a transaction id in a transaction log by storing CSN together with part, so when reading the part you need to compare its CSN with max CSN.
But that sounds a bit unsafe. Your commit instead of adding one more record to a transaction log will need to touch several files/folders. So it will be less atomic, and you will have 2 sources of truth (transaction log and state witten in the part).
- where / how mintid / maxtid will be stored (taking into account parts immutability).
There are two options: store it in special file inside data part or write all operations with data parts into some WAL (and make snapshots of parts state on log rotation), I'm not sure which one is better. It may look like the first option violates parts immutability, but actually it does not. Since mintid/mincsn is written and part creation is comitted, it cannot be changed. It's a bit harder with maxtid, because maxtid can be committed or rolled back to NULL. But it's not a big issue to overwrite maxtid inside data part, because part will look like it's immutable until part removal is committed and it does not matter if part is immutable or not after removal is committed. Btw, now it's possible to write checksums.txt
or columns.txt
into "immutable" part if there is no such files.
- Isn't just one tid enough? Maybe tid + some change in the status of the part will be sufficient?
- A commit is adding an entry (tid, csn) to the transaction log. I didn't fully understand the purpose of csn. It seems that the simple checkmark "transaction completed successfully" would work almost as well. So just adding the transaction number to the transaction log (which obviously has some kind of natural order).
We have to assign some timestamps or ids in the beginning of transaction (however, it's also possible to assign timestamps when committing, but in this case we have to atomically write such timestamps into all involved data parts holding some exclusive global lock, so I don't like this approach). Tids are assigned in the beginning and just one tid (or some begin timestamp) could be enough. But it's inconvenient to use begin timestamps for transaction ordering and checking part visibility, because we do not know when each transaction was committed if we don't have commit timestamps (csns).
Another advantage of using commit timestamps is that we can allocate one CSN for a batch of multiple transactions to reduce workload on ZooKeeper. Tids can be allocated locally, we only need to guarantee than tids are unique across all replicas.
And seems like having both begin and commit timestamps can be useful for implementing serializability check (but it's out of scope and we don't need serializability for now, so I did not think it over).
- It seems that the transaction is always executed locally (i.e. it always operates on a local set of tables) and synchronization via the zookeeper is needed only to read the consistent state. Now a distributed transaction (capturing different tables on different nodes) does not seem to be supported (it seems that may require some extra communication).
Yes, distributed transactions across multiple nodes are out of scope for now. Seems like it's possible to use global CSN log for the whole cluster and implement something like 2PC (or 3PC), but I did not think it over.
- Where (in the zookieper / in the file system) these transactions will live and how to understand whether it is necessary to write to the zookieper or in the filesystem (if the transaction includes Replicated and non-Replicated tables / only Replicated / only local).
For ReplicatedMergeTree we have to allocate CSNs through ZooKeeper. We can do it by simply creating sequential nodes like transaction_log/committed/csn-xxxxxx
with tids list in node data, so it will work as transaction log. I'm not sure if we should implement some separate transaction log for non-replicated tables, because it will be hard to keep local logs in consistent state with log in ZooKeeper. For simplicity we can use common log in ZooKeeper for both replicated and non-replicated tables, at least for the first implementation of transactions. Of course, it's questionable, because it will require ZooKeeper to use transactions with non-replicated tables on single server. But I hope that ClickHouse Keeper will be available soon, so it will be easy to run tiny Keeper inside local clickhouse-server even on single-server installation.
- Transactions seem to have nothing to do with the replication mechanism. So any guarantees they give in a distributed system are limited by the eventual consistency. (In other words, if some parts from some tables of the committed transaction have not yet been replicated, will there be a read of an inconsistent state?).
To make it consistent we can track which replication log entries are executed and choose the corresponding CSN as a snapshot version for reading, so all parts that must be visible with the chosen CSN will be already replicated. Some optimizations are possible, for example, it seems like we can ignore MERGE_PARTS entries when determining such CSN (but it will probably require more complex visibility rules).
But there is another issue with consistency. Let the last CSN (and current snapshot version) is 42 and all replicas are synchronized. Client1 inserts some data on replica1 and client2 inserts some data on replica2 at the same time. The first transaction is committed with CSN=43 and the second one with CSN=44. Then client2 runs select on replica2. If new data for CSN=43 is not replicated yet from replica1 to replica2, then current snapshot version on replica2 is still 42, so client2 will not see their insert. Probably we should provide a way to wait for current snapshot version on replica to become greater or equal than the CSN of current transaction (it can be controlled by a setting).
- About transactions with INSERT few important use cases are engine = Distributed and maybe (?) for engine = Null. Designing that part from a practical point of view even somewhat more important than NOT-insert use cases (IMHO).
Distributed case seems to be harder, because all shards are independent and we have to receive some kind of acknowledgment that transaction can be committed on all shards. We need more complex commit protocol here and it's harder to deal with consistency on reading. I didn't think it over yet.
- some kind of 'garbage collection' - is it possible (and when) to clear the transaction log / and what will happen if there are parts after the rollback of the transaction.
The simplest solution is to store last N entries in transaction log and store last seen CSN for each replica. If replica cannot find last seen CSN in transaction log on startup, it checks local data parts, moves parts with unknown CSN (parts that have mintid/maxtid, but do not have mincsn/maxcsn) to detached and schedules fetches of such data parts (must likely replica is staled, so it will happen anyway). But it depends on where to store version metadata for data parts (the solution above should work if we store versions inside data part).
- If we will support multistatement transactions (i.e. BEGIN TRANSACTION ... ; INSERT ; INSERT ; INSERT; COMMIT TRANSACTION) - how that state of transaction will be preserved between queries? Will it require session? 2 Concurrent transactions in the same session?
"Transaction" object will live in session context (or in query context for single-statement transaction with autocommit), multiple concurrent transactions in one session are not allowed.
- timeout to terminate idle transactions?
Yes, we should add some timeouts (at least for writing transactions, because they can "lock" maxtid).
- maximum transaction size?
I'm not sure, we have quotas and transactions are rolled back on exceptions.
- configurable transaction isolation level ('i want to read all, also uncommited', 'want to read committed only' )
Read Uncommitted is just reading without any snapshots just like it works now. Read Committed is easy to implement for readonly transactions (just take new snapshot for each statement), but it's harder to avoid serialization errors in writing transaction on this level, because we need some deadlock detection for this (see the discussion above).
===
If same data part is in Commited or Outdated state i .... change state of created parts ...
You mention that state few times - do you mean the state of a part object in the RAM, or it will be a marker in the filesystem?
I mean state in RAM.
But that sounds a bit unsafe. Your commit instead of adding one more record to a transaction log will need to touch several files/folders. So it will be less atomic, and you will have 2 sources of truth (transaction log and state witten in the part).
No, we will write csns into data parts after commit, not during commit. If data part does not have csn yet, we will do csn lookup to get csn by tid (if transaction with the corresponding tid is committed, i.e. there is a record in transaction log). Some optimizations are possible to reduce the number of csn lookups (for example, we can add in-memory flag "maybe commited" for data parts and set it to true only before committing). If we crashed right after commit, we will read csns from transaction log on startup.
To be honest, I didn't think too much of transactions outside of INSERT usecase (there seem to be a lot of pitfalls), for me the main use case is the atomicity of inserts. Some additional guarantees for reading (except "do not read what is not commited") seem to be less in demand. Therefore, I can have a one-sided look at the task (maybe some questions will look a bit naive due to that).
I also think the main use case is atomicity of inserts not Transactions.
Note: If same data part is in Commited or Outdated state it does not mean that part is actually created or removed in terms of transactions.
@tavplubix Will a query without transaction see those data parts which are in Committed state but corresponding transaction is not committed yet?
Will a query without transaction see those data parts which are in Committed state but corresponding transaction is not committed yet?
A query without transaction will work in a way similar to Read Uncommited isolation level, so it will.
does clickhouse have this code branch for transaction? how can i get the code for transaction?
does clickhouse have this code branch for transaction? how can i get the code for transaction?
See #24258
thank you I got it. in 24258
I see some modifies for file parseQuery.cpp in file changed module, for example, the parse of 'begin transcation','commit','rollback' and so on. But when I pulled the source of branch mvcc_prototype, I did not find these modifes, It's the same with master branch of clickhouse. I got the branch code of mvcc_prototype. I want to compile it and try to use transaction. Can you give me some suggestions about how to use it? thank you
sorry, I find it, the file name is parserQuery.cpp ,not parseQuery.cpp.
I want to compile it and try to use transaction. Can you give me some suggestions about how to use it? thank you
@tavplubix is this feature merged into master?
@chenrun0210 Yes, but in prototype stage
https://github.com/ClickHouse/ClickHouse/pull/24258
In order to enable it, you need to use config file https://github.com/ClickHouse/ClickHouse/pull/24258/files#diff-0842aacdba5c073d40f3b5157b9baebbb6a9fc86fabff99fe3dd8b336febdc27
Some comments from testing the feature in simple cases with simple inserts with materialized views:
I think it'd be great to have a setting so that a single insert is treated as a transaction (if you aren't already in a transaction). This would make what I think it's the most common use case for us, which is an insert that's followed by many insert in MVs, really simple to use (especially via HTTP). Instead of:
BEGIN TRANSACTION;
INSERT
COMMIT
You could do something like:
INSERT INTO table SETTINGS insert_with_transaction=1 FORMAT Native
And this insert would create a transaction automatically, insert the data (with MVs) and either commit or rollback depending on the result.Another option that I think would be great to have is to not see "on flight" inserts from other transactions or non-transaction queries. As of right now you might see data on the landing table but not on the MV target (since it's in progress), or even see some blocks pushed but not others. This means that you might see data that ends up rolled back. It'd be super nice to have the option to only see everything or nothing.
And finally replication support, which I understand is in the roadmap :wink:
Any thoughts about this? Is there any way I can help to push this forward (either the suggestions or the transaction feature as a whole)?
Thanks for the feedback!
I think it'd be great to have a setting so that a single insert is treated as a transaction
It was intentionally done this way to make things more explicit. But yes, it's really inconvenient and we will add a setting that automatically starts and commits transaction for each query (not only for inserts). It's quite easy to implement.
Another option that I think would be great to have is to not see "on flight" inserts from other transactions or non-transaction queries.
Transactions should provide snapshot isolation (a bit stricter than "repeatable read"), so it's a bug if transactional SELECT see some data of "on flight" transactional INSERT. However, if you run some query without starting a transaction, it works in a way similar to "read uncommitted" isolation level. If you want to see committed data only, you should run both SELECTs and INSERTs in transactions (and it's another reason why we need the setting).
However, if you run some query without starting a transaction, it works in a way similar to "read uncommitted" isolation level.
Understood. I was testing without a transaction because it was much simpler. I'll run a similar test with SELECT
inside a transaction to confirm it doesn't see any partial inserts.
(and it's another reason why we need the setting).
Considering this I guess it should be a generic option (run_standalone_query_in_a_transaction
, might need a better name) that would run wrap the query inside a transaction if it's not already in one. Not sure what to do if it's in one, we could either throw or ignore the setting.
If it's ok with you I'll have a look and try to implement it.
Hi @tavplubix, how can we rollback/commit transaction in a cluster environment?
For example: assume transaction tx1
was created on node1
, and client was not able to reach node1
due to temporary network issue - can the client rollback/commit tx1
from a different node? Or we just wait until the session timeout to trigger rollback?
Hey @tavplubix Do you know if there is any ongoing work to add support for transactions for ReplicatedMergeTree engines? Is there anything I can do to help bring that feature to life?
Hey @tavplubix, may I ask a question? If transaction T1 inserts 100 parts, and write a log entry to ZooKeeper, but 50 parts are written creation_csn
to txn_version.txt, and left 50 part are not. Now transaction T2 read these parts, does T2 can read 100 parts or 50 parts?
If transaction T1 inserts 100 parts, and write a log entry to ZooKeeper, but 50 parts are written creation_csn to txn_version.txt, and left 50 part are not. Now transaction T2 read these parts, does T2 can read 100 parts or 50 parts?
There's no way T2 can read 50 parts. It will read either 100 parts (if T2 started after T1 was committed) and 0 parts (if T2 started before).
Algunenano commented on 13 Jul Hey @tavplubix Do you know if there is any ongoing work to add support for transactions for ReplicatedMergeTree engines? Is there anything I can do to help bring that feature to life?
Hello. Does the latest version support ReplicatedMergeTree engines? I have no more valid information to address the transactional support of ReplicatedMergeTree engines.
Use case Main use cases:
INSERT
s (maybe into multipleMergeTree
tables, including materialized views, #11909) in a single "transaction". AllINSERT
queries in a "transaction" must either completely succeed or fail. All inserted data parts became visible only when "transaction" is committed.SELECT
s in a single "transaction", so all of them will read from one consistent snapshot of allMergeTree
tables.It also may be useful to support transactional
ALTER ... PARTITION
andALTER ... UPDATE/DELETE
, but the first implementation probably will support onlySELECT
andINSERT
queries. Exception will be thrown on attempt to execute non-transactional query inside transaction.Describe the solution you'd like MVCC-based transactions on data parts level. It will easily provide Snapshot Isolation (a bit stronger than Repeatable Read) for reading queries. As for writing queries, Snapshot Isolation requires to detect conflicts and rollback a transaction if it tries to modify an object which was modified by concurrent transaction. It's not a problem for concurrent
INSERT
queries, because they just append new data to table by creating new data parts, so twoINSERT
s cannot modify single object and cannot cause write-write conflict (but there is a nuance withReplacing
,Collapsing
and other special*MergeTree
engines). To support transactionalALTER
s it should be enough to forbid concurrent merging/mutating operations with overlapping source parts sets and do not assign such operations on future parts (because creation of future part can be rolled back).Implementation details for simple MergeTree
Data parts versioning Each writing transaction has some unique identifier, let's name it
tid
. We usetid
s to understand which transaction has modified/is going to modify each part. We will obtaintid
in the beginning of transaction and pass it through query/sessionContext
anywhere it needed. If a data part is involved in some transaction, then it must contain the following metadata:mintid
-tid
of transaction that has created the partmaxtid
-NULL
ortid
of transaction that has removed the part (it's mostly forALTER ... PARTITION
s, but it also may be useful forALTER UPDATE/DELETE
and background merges, see below)mincsn
-NULL
or "cached"CSN(mintid)
, described belowmaxcsn
.-NULL
or "cached"CSN(maxtid)
, described belowWhen some query in a transaction creates or removes data part, it writes the corresponding
mintid
ormaxtid
. It's the only difference between transactional and non-transactional writing query.Commit CSN (Commit Sequence Number) indicates a point in time when transaction was committed and its changes in part sets became visible for other transactions. We will maintain
tid
tocsn
mapping to understand when each part was committed. Note: If same data part is inCommited
orOutdated
state it does not mean that part is actually created or removed in terms of transactions. It means only that the corresponding changes are applied to parts set in some table and ready to become visible after the whole transaction is commited. If some query reads data without creating a transaction it will just see parts inCommited
state.When transaction is committing, it allocates new CSN from a global monotonic counter (for not replicated case it can be just static variable, for replicated case we can use sequential nodes in ZooKeeper, see below), writes its
tid
and allocatedcsn
into transaction log (WAL for not replicated case, for replicated see below) and updates in-memorytid
tocsn
mapping. After that it writes the correspondingmincsn
/maxcsn
into data part (it's an optional step, because we always can getcsn
bytid
from the mapping or transaction log).Rollback Transaction is rolled back automatically on any uncaught exception. On rollback it should change state of created parts to
Outdated
and resetmaxtid
of removed parts toNULL
.Snapshots and data parts visibility Snapshot is just a CSN. When transaction starts it gets current value of CSN counter and uses it as current version, let's name it
snapshotVersion
. Data part is visible for transaction if:mintid == current_tid
CSN(mintid) && CSN(mintid) <= snapshotVersion && (!CSN(maxtid) || snapshotVersion < CSN(maxtid)) && (!maxtid || maxtid != current_tid)
Note: It's the first reason why do we need separate transaction ids and CSNs with mapping between them. We can use only
tid
s to define parts visibility, but in this case we have to save a list of concurrent transactions when taking a snapshot and check if each of them is already committed or not to determine part visibility.Backgound merges Multiple parts can be merged if all of them are visible with current snapshot version. We also can merge parts if all of them have the same
mintid
and does not havemaxtid
, but I'm not sure if we need merge such parts.Since merge does not actually modify data (at least for ordinary MergeTree), we can just choose maximum of CSNs of the source parts as a CSN of result part and write the corresponding
maxtid
to each source part, so transactions will not know anything about background merges. Of course, in this case merge should modify version metadata only when result part is ready. It means that merge will fail ifmaxtid
appeared in some source part due to concurrentALTER ... PARTITION
orALTER ... UPDATE/DELETE
, so such queries should cancel merges of the corresponding parts (if any). It may look like significant change, but we already haveremovePartProducingOpsInRange(...)
which does similar thing.On the other hand, it may be useful to allocate
tid
s andcsn
s for merges as for other transactions (and seems like we have to do it for special *MergeTree engines such asReplacingMergeTree
).Background cleanup As before, background thread looks for
Outdated
parts with reference counter equal to 1. But it should not remove part if there is nomaxcsn
or ifmincsn
is greater than the minimumsnapshotVersion
of running transactions.ALTER PARTITION, ALTER UPDATE/DELETE and OPTIMIZE Unlike
INSERT
s, such queries may try to concurrently remove the same part and replace it with other part causing write-write conflict. We can usemaxtid
to detect such conflicts. Query writes itstid
intomaxtid
before making any changes. Ifmaxtid
is notNULL
, i.e. other transaction is currently tries to remove the part, query throwsSerialization error
.Durability and server startup We have two options here:
fsync
below). Transaction log will contain only(tid, csn)
pairs of committed transactions. On restart we will knowtid
s of every data part and will know if the corresponding transactions were committed or not. It will be easier to rotate the log in this case.On server startup we will remove all parts which does not have
CSN(mintid)
or haveCSN(maxtid)
.Note: We do not fsync data parts (at least by default), so any data part may become broken in case of hard restart. Committed transaction can be partially rolled back in this case even if all version metadata were reliably written somewhere.
Making it work with ReplicatedMergeTree
For
ReplicatedMergeTree
we have to allocate CSNs through sequential nodes in ZooKeeper and writetid
s into these nodes. Buttid
s still can be obtained locally, we only need to guarantee thattid
is unique across all replicas. So we can use(snapshotVersion, local_tid_counter, host_id)
tuple astid
(we don't really needsnapshotVersion
intid
, but it can be useful for introspection, because it's impossible to relate localtid
s of different replicas). Replica can commit multiple transactions into one CSN to reduce workload on ZooKeeper (it's the second reason why we need separate tids and CSNs). It's possible to ensure that other replicas did not make some commits concurrently by checking cversion of parent node.To make snapshots and part visibility work with replication, we have to add
tid
intoReplicatedMergeTreeLogEntryData
. Snapshot version on some replica is the maximal CSN such that all corresponding log entries are executed (so replica have all visible parts locally).We can use separate CSN logs in ZooKeeper for different shards. Distributed transactions across multiple shards are out of scope of this RFC.