Open tuommaki opened 7 months ago
Normal Tx process is: Validate -> Verify dep -> propagate -> Save -> Execute -> Mark as executed
2 nodes are involved: the booting node that start and the bootstrap node that send the missing tx.
1) If not already defined, get the last executed Tx. Use it for bootstrap, must be done before receiving new Tx. This Tx is saved (db or local conf) because if the bootstrap fail it should restart from the same point. 2) Connect p2p. Wait enough peers are connected (define the min number of peers) 3) Receive new Tx from p2p: process as normal. If there's missing Tx to process, the verify dep step will put in wait the Tx. New Tx that has all its dep can be executed. New Tx are received before the start of the bootstrap to avoid to miss tx that arrive during the bootstrap process. 4) Start bootstrap process in //
<create timestamp> - (2 * <max Tx execution time>)
.
We open a window from last executed Tx less the max time that a Tx can be executed.
If the node was running correctly, it should have executed all Tx before this time.
So before this time the node is up-to-date. I multiply by 2 the max exec time to add some room for Tx executing on other nodes.A creation timestamp must be added to the Tx struct. There are 2 possibilities:
Initialize it when the RPC receive it. Pro: it uses the RPC server time that we control. Cons: it's not part of the Tx hash, so it can be changed by a node.
<Last executed Tx create timestamp> - (2 * <max Tx execution time>)
This process is independent of the scheduling and normal Tx processing. To ease the integration, I think we should do a little refactoring first. Move all Txvalidation into Mempool and mempool decide to send Tx to execution. We change from a pull to a push to execution model for mempool. With this change, we can integrate the bootstrap process. After we should remove Tx management from execution layer. Some Tx doesn't need execution like Verify or deploy Tx that need only to be marked to be executed. So this will allow to separate Tx processing (I decide what to do for each Tx) and task execution (execute on the VM). In the end, the execution module executes only tasks and return the result to the mempool Tx processing. The change to task execution will help to integrate new type of tx or new way to execute Tx like not always executing Tx.
Your proposal has a good idea when it begins from the latest transactions, as that allows node to quickly catch up the online operation. However, I believe it has several problems:
I don't think CDC will help here because it's done for deduplication of data in local that we don't need. ... CDC doesn't add to the replication algo because its goal is to avoid duplication from file content. Simple chunk verification is enough.
Wrong. I don't think you understood how it works. Data deduplication is one of the most obvious applications of it, but the content defined chunking itself solves the problem of having distributed agreement on chunk boundaries over large amount of data when bits change anywhere in it.
Your proposal has the very problem that CDC actually solves: When a single transaction arrives in the middle of all transactions (i.e. some long running node finally syncs with another node that has that single missing piece), all chunks invalidate because that simple chunk boundary of 10 transactions shifts everything and this then changes every single chunk hash.
This brings us to second problem: Chunking from newest to oldest will always shift the chunk boundaries if those are counted by number of transactions.
I don't like this idea at all. It introduces obvious problems when time is not strictly, precisely synced on every single node of the network. There will be situations where Run transaction has been created after the first Proof, which is wrong.
The timestamp must never be used for anything else than rough sampling of transactions that happened around the same time. This is somewhat similar to location: Timestamp in our case is similar to district name in location. It's not a high precision coordinate to specific spot, but a vague concept of "about there", which helps to limit the amount of data needed to be processed.
The timestamp should be kept always as a node-local concept, because then it's consistent between distinct samples. When all timestamps have been created locally, then one can analyze transaction execution time and progress by looking at timestamps in the database.
In distributed environment, wall clock timestamps must never be used for anything that needs absolute precision.
When writing above, I realized that even my first proposal has a severe defect because it uses the timestamps. It can get into infinite syncing across nodes over time, if two nodes have same transaction with different timestamps where the delta is larger than the size of a single sampling window, like e.g. 5 days apart, then in this case, every time these nodes sync, depending on with whom they sync, the changes go back and forth.
However, at this point I'd accept this defect in order to keep the solution simple enough:
To agree on an algorithm, I think we should first agree on the goal and assumption.
We have 2 nodes:
Booting Node
that have missed some Tx.bootstrap node
and have all Txs .The goal is:
If the node start with an empty database, we should consider using a snapshot or say that the node won't have the Tx before a certain time and use this algo.
The process is to manage the case that a node crash and the operator restart it later. We suppose that:
Each node receives the Tx in a different order, but they get the same Tx after some time that depend mostly on the time to propagate in the network and the time to download the Tx files. When a node creates a tx, it propagates it to the network just after downloading its file. Another node receive the Tx, download the corresponding asset and save it. In this process, the longest step is the download.
Example of Tx timeline between the 2 nodes when the booting node start
Boostrap node ..|t4e|t5e|t6e|t7e|t8e|t9e|t10e|t11e|t12e|t13s|t14e|t15s|t16d|t17d|t18s|t19e|t20d
Booting node ..|t5e|t4e|t6e|t7s|t9e|t10s|t8s|t11d|14d| .. |t18s|t20d|t19d
T1 T2Crash T3Start T4Endbt
TBegin
e:executed, s: saved, d: downloading/propagate
From this timeline, the algo will determine to copy from the boostrap node to the booting one the Tx: 11,12,13,14,15,16,17
From this timeline we define these assumptions:
* To avoid testing all Tx in the boostrap node DB we've to find a way to detect T1. Before T1 there's no missing Tx in the booting node.
* T2Crash is the date of the last Tx save in the DB.
* T3Start is the time when the booting node's start.
* Between T1 and T2Crash booting node can miss Tx.
* After T2Crash all Tx are missing in the booting node.
## Algorithms :
All Tx arriving from the p2p or RPC during the bootstrap are processed and added to the DB as the normal process.
### Algo1 the simplest:
Define a time TBegin that is before T1. For example, we estimate that T1 can't be before 30 mn behind T2crash (last Tx saved).
From TBegin/Tx6, bootstrap node send all its Tx Hash (saved and downloading) that are after Tx6. So it returns Tx6 to Tx18 Hashes (it gets Tx18 before getting the booting node request).
The booting node compare all received Tx Hash with its Tx in the DB. Missing Tx are requested and saved executed.
Pro: simple
Cons:
* can miss Tx if Tbegin is after T1
* can transfer a lot more Tx Hash is Tbegin is for behind T1
### Algo2 Try to optimize the T1 discovery
The idea is the booting node request the bootstrap node to build a chunk of tx, calculate the hash of each chunk and send all chunk hash back.
The booting node build the same chunk hash set and determine which chunk are different. If the chunks are ordered by time, the beginning chunk should have no modification (before T1) after some modifications are visible (after T1) then some chunks are missing on the booting node part.
To do that, Tx should be ordered in the time the same way on both nodes. I propose to introduce a `creation_time` field, set when the Tx is created first. It contains the timestamp (UTC +0) of the creation. If nodes order the Tx using this fields, they will have the same order because it has the same value on all nodes, and it's timely ordered. If all node has nearly the same date (a few second diff) the chunk will be timely ordered.
If the time between node is too different, some Tx can be outside the chunk windows [Tbegin, now] and miss. For this reason, the chunk windows should be important: 30 mn or 1 hour.
When the booting node detect a chunk with a different hash, it requests all the Tx hash of the chunk, compare the Tx hash with the one in the Db and request the TX that are missing.
#### How to determine the chunk size.
In my first version, I propose to use the same size for all chunk. It can be optimized by increasing the size more the chunk is far from T1 (before/after) because it's more likely that there's no modification if it's far before T1 or all is missing if far after. As we don't have 1000 Tx per second, not sure if it optimizes a lot of the transferred data volume.
### About the CDC
CDC operate on arbitrary binary data and try to build chunk of these data where chunk are build using hashing technique so that if we take 2 files with the same data and a few modifications in one. The chunk of unmodified data will be the same, and modified data will be inside the chunk by changing the chunk size.
Here, we don't have an arbitrary binary data but Tx hash with fixed length. There is no way to force the CDC algo to cut chunk only at hash boundary. So chunk will have at the beginning or at the end a truncated Tx hash. If we find a chunk modified, how to request the truncated Tx hash?
In our problem we have modification only at a specific point T1 and after all chunks are modified or missing in the bootstrap node.
I really not sure that CDC will help in finding T1. Using size varying chunk depending on the distance to T2 crash can be more efficient to minimize the number of chunk hash transferred and analyzed.
I am not sure that we need to have varying chunks. We can define instead of 10 Tx for the size a value proportional to the number of Tx between TBegin and T2Crash on the booting node.
After if the data transfer is important we can build the algo in an interactive way where the booting node request chunk one by one starting a T2Crash and increasing the size depending on the number of missing found.
### General Algorithm.
Never mind this can can be changed later if we agree on the base algorithm:
* On the booting node, get all Tx hash between T2Crash and less some time. Define TxBegin in the past where the search starts. It defines the interval [Txbegin, T2Crash]
* Order this set. Using a creation date will help to minimize the number of chunk containing missing Tx.
* build chunk from this set
* On bootstrap node, do the same with the Tx interval: [TXbegin, now (db + download)]
* On the booting node, compare the chunks Hash.
* If one chunk differ or is missing, request the Tx hash of the chunk to the bootstrap node
* Query the booting node (DB + Download) to determine the missing Tx.
* Request the missing Tx to the bootstrap nodes.
### Other possibility:
Order the Tx hash set by Hash and use CDC to see if it's efficient to determine the chunk (We've to solve the Tx hash boundary pb).
The risk is that all Tx missing send after T2Crash are spread in all chunk and every chunk are modified, so in the end we transfer all Tx hash.
### Change to allow the synchronization:
Move the Tx waiting part after saving and propagation so that waiting process don't interfere with the sync process.
## Questions:
At start, does not executed TX should be executed or just marked as executed by the bootstrap process? For example, Tx (T8,T10) are not executed but found executed on the bootstrap node. Do we execute them as we do today?
T15,T16,T17 are not executed on the bootstrap node but will be sent to the booting node during the bootstrap process. Do the booting node execute them?
I think your first assumption undermines rest of the reasoning:
before the crash, the node was working correctly and didn't miss Tx.
This is wrong. A booting node can, and often will, miss transactions in-between.
Easy way to produce this:
Say there are nodes A, B and C.
Node A is broadcasting transaction to B and C. It manages to send it to B but it crashes/restarts/looses network connectivity before it gets to send it to C.
Now Node A and B have transaction T, but node C doesn't.
Nodes continue working as before.
Node C was functioning correctly all the time. In fact, it has the longest uptime of all of them, until it's restarted for an upgrade. At this point, when it syncs itself at startup, it realizes it misses individual transactions from here & there.
This is one scenario where the static chunk size fails, because changes in the middle will result with changes in all consecutive chunks, when only one chunk would need re-syncing.
If the node start with an empty database, we should consider using a snapshot or say that the node won't have the Tx before a certain time and use this algo.
I agree with this, but I think this is kind of an optimization (an important, but still). We probably want to build a separate tool/worker/task that on longer interval generates bigger exports from DB that can be directly imported to empty DB.
There is no way to force the CDC algo to cut chunk only at hash boundary. So chunk will have at the beginning or at the end a truncated Tx hash. If we find a chunk modified, how to request the truncated Tx hash?
If our stream of data, that CDC computes over, is aligned by transaction hashes, can't we then easily calculate the Nth transaction by simply dividing the offset by hash length and add 1?
I really not sure that CDC will help in finding T1.
Actually, it does. As I initally wrote:
These groups can be then hashed and marked as synced.
To elaborate on that: When the nodes agree on chunk boundaries, they can hash the chunk contents (only the concatenated tx hashes, not all data) and these hash lists can be very efficiently transferred & compared between nodes. This way, nodes can sync whole chunks and it can easily cover also missing chunks from the beginning and in the middle.
The risk is that all Tx missing send after T2Crash are spread in all chunk and every chunk are modified, so in the end we transfer all Tx hash.
True, but is this considerable problem? I'm thinking that we should transfer chunks as list of transaction hashes and then transactions are fetched separately. If there are individual transactions missing here and there, it can cause number of chunks to be compared, but if the contained transactions are mostly present, the amount of processed data will still be kept relatively low and in the end, wouldn't this scenario still be relatively rare?
I put this assumption to express the fact that at one moment, there must be a consensus on the Tx state on the network. If not, all nodes can have missing Tx during the normal process (Tx propagation can fail at any moment), so it's impossible to synchronize all the Tx from one node. Even if the booting node synchronize with all nodes, it can miss Tx if another node is down.
So as your scenario is possible, we have 2 choices I think:
I remove the master/slave possibility.
Otherwise, we have to accept that node can have missing tx.
As the rest of the algo depend on this assumption, we've to agree on it first.
About the hash cut, what I say is. let say the CDC do a chunk of 3 Tx with hash: AA, BB, CC The input data of CDC is AABBCC and the chunk can be AAB BCC. So if on the booting node the first chunk is different, it's hard for the bootstrap node to get the hash of the Tx BB from the chunk. We need another algo to recompute all the Tx that belong to the chunk. For example, here chunk1: AA BB and chunk2: BB CC. We've to validate that there's no risk of missing tx in this reconstruction algo.
From the way crypto hash are created, I think this scenario can arrive with a few Tx because the spread on the hash space of the generated hash are maximum. So in the case we use alphanumeric sort for example, if you take a few Tx you'll get a very different first letter of the hash and the few Tx will be spread with the others.
Otherwise, we have to accept that node can have missing tx.
Yes. I've tried to convey this the whole time: The syncing of all nodes together in relation to each other is eventual. Therefore it is expected that some nodes might have missing transactions at any point in time, but when they restart over time, they will catch up with those older missing transactions, while there might be newer ones again, depending on functioning of everything and timing of everything. Still, over time, in abstract level, this whole system approaches towards full sync in the grand scheme.
So if on the booting node the first chunk is different, it's hard for the bootstrap node to get the hash of the Tx BB from the chunk.
This is the reason why there must be agreement on inclusion and exclusion (begin/end).
I'll take over on this for now and work on the first iteration for the implementation. The discussion here hasn't lead anywhere and we have spent two weeks from the original three week ideal.
As I've tried to express here, I have an idea how this can be implemented so I'll proceed with this and we'll see how it comes out.
Ok, try to implement a first version.
Introduction
Currently, each Gevulot node runs in isolation and if there is downtime for any given node, it will miss out the transactions distributed meanwhile. Due to deterministic and cross-verifying (and kind HA) nature of the overall devnet cluster, this is ok for the devnet so far.
Recently, however, we learned from a use case where single proving workload would be too resource intensive to execute on all [available] nodes as the design is now and we need to incorporate a VRF to schedule Run transactions only to individual nodes. This will then require a node syncing functionality so that transactions won't get lost from the devnet overall and that there is possibility to catch up from where the node was left, when there is maintenance downtime for the node.
Objective
Implement node syncing so that when the node starts from scratch or when the node has been shutdown for some period of time, that it will sync itself from another peer.
After syncing, the new node must have all transactions in the database.
Transactions that have been received via syncing mechanism, must not be re-executed. These transactions must also have original timestamp retained. Missing deployments / programs must be downloaded.
Syncing should take place before the scheduler is started.
In all cases, the nodes in the cloud should always have all data at any given point in time.
Possible ideas for implementation
Since devnet does not have any kind of consensus protocol or other distributed system's ordering mechanism employed, there is no way of putting all transactions into absolute order on distinct nodes.
However, each transaction has node specific timestamp and a hash that is globally consistent. With these properties, there should be a way to implement the syncing in relatively efficient and reliable way.
Transactions can be aggregated into groups by some time period and then within the group they can be sorted by the transaction hash. When overlapping the time periods between nodes, the nodes should be able to find common begin and end for the aggregation groups. To make this grouping deterministic in distributed setting, we could borrow a "chunking" mechanism from the FastCDC algorithm and computing a deterministic check point from a stream of concatenated transaction hashes.
These groups can be then hashed and marked as synced.
When node starts, it can submit a list of synced groups (or maybe last 50 groups?) and that way find the optimal starting point for the syncing.
When the starting point has been found out, the node that needs missing data, can then request all transactions (serialized, containing timestamp and execution status) for the next group and persist. Iterating this way up until the present state. P2P network should be running all the time, receiving transactions into the scheduling queue (but scheduler must not run yet). If the syncing method receives a transaction with
executed: true
, while the same transaction is still sitting in the scheduling queue, it should be removed from the queue.One should also consider situation where two nodes have forked for some reason and have diverged. In this case, the nodes should find the last common checkpoint (the group of transactions) and from there onward, proceed by exchanging lists of transaction hashes for the following groups and sync each other's missing transactions, effectively "zipping up" themselves up to the present state.
In the present architecture, it is most natural to incorporate functions related to this, into P2P implementation where individuals can perform unicast RPC messaging.