Closed sandreim closed 1 month ago
Out of curiosity, if a candidate fails to become finalized when would it get pruned? Session boundaries?
If a candidate fails to be finalized, the inclusion relay chain block will never get finalized. This means the relay chain finality will stall. In that situation we trigger distribution aggression and resend all assignments/votes for candidates included under the oldest unapproved relay block with the hope to work around any potential network issues that prevent finality.
Generally the workers must import assignments and approvals for candidates included in unfinalized relay chain blocks. This means we only prune after finality. Even after finality of a block, the current approval-distribution
implementation keeps track of recently finalized blocks in RecentlyOutdated
, such that it won't punish peers for valid messages sent after a receiving node observes finality. We must also do the same.
This approach seems really reasonable.
The approval-distribution / approval-voting split was likely wrong from the beginning and performance has suffered as a result. Splitting work by candidate makes sense - though will it be properly balanced even after assignments are batched?
long-term it'd be interesting to autoscale the number of workers according to something like a PID controller. Over time, the amount of CPU spent on validating PVFs would hopefully be the largest source of load on the node.
The approval-distribution / approval-voting split was likely wrong from the beginning and performance has suffered as a result. Splitting work by candidate makes sense - though will it be properly balanced even after assignments are batched?
Yes, I believe batching and bundling are orthogonal. We need to change the criteria for assigning work to ensure workers on average are processing similar amount of assignments/approvals. When batching we could look at current load of individual workers and aim to schedule the biggest batch on the least occupied worker.
long-term it'd be interesting to autoscale the number of workers according to something like a PID controller. Over time, the amount of CPU spent on validating PVFs would hopefully be the largest source of load on the node.
Yes, I expect autoscaling will be easy to implement. I am not really sure we'd want any tight coupling with OS features, but instead we could be implementing an internal node load monitor based on things we can measure like network tput and PVF executions per block. Once we include pov_size
and execution time in candidate receipts, it should provide some good internal metrics that subsystem's can use to apply backpressure or scale up if needed.
As we want to scale up to higher number of cores, for example 200, we're likely to see similar issues with other single threaded subsystems like statement-distribution
or bitfield-distribution
.
It makes a lot of sense that an initial implementation of this worker based approach could bring also some major refactoring and maybe some orchestra support for workers making it easier to implement where needed later.
This issue has been mentioned on Polkadot Forum. There might be relevant details there:
This issue has been mentioned on Polkadot Forum. There might be relevant details there:
https://forum.polkadot.network/t/update-validator-set-size-increase-on-kusama/8218/1
Spent some time think about this and I have a proposal that should provide us the end goal where processing of assignments and approvals can be process in parallel and will scale up with the available hardware capacity.
Currently, every time we receive a new message the approval distribution and approval voting processing are basically serialised and consists of the following important processing:
From these operations the most expensive ones according to pyroscope are the crypto operations at step 3) and the operations at step 1), the good part is that both 1) and 3) can actually be done in parallel.
We keep both subsystems and we augment approval-distribution with a parallel worker pool(ValidationPool) where work is distributed based on the validator that created the message(not the one gossiping it), the algorithm of distributing work would be simple worker_index = (msg.validator + RANDOM_SALT) % WORKER_COUNT
. This guarantees that messages coming from the same validator lands on the same worker, so the worker having just a context built out of each relay chain can perform the following validation work, which is currently split between the two subsystems.
Once the message has been properly validated by a ValidationPoolWorker it is forwarded in parallel to both approval-voting and approval-distribution, so that they both can perform the remaining operations in parallel with the assumption that the message is now valid approval-distribution will do 5 & 6 and approval-voting will do 2 & 4.
The proposal in a picture
View updates are forwarded to the ValidationPoolWorkers, so they can use it to decide if a message is correctly received.
The main approval-distribution loop will simply through the messages that have been already validated as it does it now without any changes. All messages that haven't been validated yet and are at the moment we receive the view update that are in the worker pools queue will be forwarded to the peer when approval-distribution main loop does step 6)
Worker pools send the reputation update to approval-distribution periodically and the really bad offences instantaneously.
The main approval-distribution loops returns all the votes that have been already validated, this is functionally equivalent with the current setup since GetApprovalSignatures always returns only the messages that have been fully processed and since the new pipeline will process messages faster things should actually be better in stress conditions.
To prove this can be done with good results I created a small Poc with the above and ran subsystem benchmarks with 1000 validators, 100 occupied cores all tranches triggered for 10 relay blocks with both assignments and approvals. This is the worse case scenario when sharding breaks and all validators have to validate 100 parachains per block.
Chain approving does not fall behind
Poc: Chain selection approved after 4000 ms hash=0x0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a
Master: Chain selection approved after 71500 ms hash=0x0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a0a
@sandreim @AndreiEres @eskimor @ordian Let me know what you think.
I'm still thinking about it, but here are some inline comments/questions:
- approval-distribution - sanity check that we should have received this message from peer(assignment is not duplicate, assignment is for known block, approval is received only if we received assignment, etc.).
- approval-voting - sanity checks again the message and loads state from the database
It would be great if you could detail what you mean. Is it just a candidate existing in the db ?
- approval-voting - verify VRF assignment and approval signature.
- approval-voting - bookkeeping and database update
- approval-distribution - bookkeeping and storing of message in in-memory structs(GetApprovalSignatures answers uses this).
- approval-distribution - distribution of new messages according to topology and random sampling.
From these operations the most expensive ones according to pyroscope are the crypto operations at step 3) and the operations at step 1), the good part is that both 1) and 3) can actually be done in parallel.
I am not sure how you can do 3 as you would need to have to read CandidateEntry
that you write in approval voting.
Proposal
We keep both subsystems and we augment approval-distribution with a parallel worker pool(ValidationPool) where work is distributed based on the validator that created the message(not the one gossiping it), the algorithm of distributing work would be simple
worker_index = (msg.validator + RANDOM_SALT) % WORKER_COUNT
. This guarantees that messages coming from the same validator lands on the same worker, so the worker having just a context built out of each relay chain can perform the following validation work, which is currently split between the two subsystems.
Wouldn't this approach work better if we just had one single subsystem and this pool part of it. I mean, it's less messages to pass around and easier to debug, and enables very easy batching to further optimize signature checks.
- Sanity check the message is not duplicate and that approvals are received after the assignment.
- Check VRF assignment is correct.
- Check we received all assignments for the approval vote.
Doesn't this introduce a race condition vs the approval-voting subsystem that writes DB ?
Once the message has been properly validated by a ValidationPoolWorker it is forwarded in parallel to both approval-voting and approval-distribution, so that they both can perform the remaining operations in parallel with the assumption that the message is now valid approval-distribution will do 5 & 6 and approval-voting will do 2 & 4.
I guess we could avoid doing 2 if we merge the 2 subsystems.
Advantages of the proposal.
- It does not require a full re-write it just rips the parts that are consuming most of the cpus in both subsystem and puts them in a worker pool that can do stuff in parallel, to prove that I built a small PoC with all the thing described above.
- It scales up processing with the available hardware.
This is indeed an interesting proposal and the above advantages sound reasonable
- No database changes needed or migrations.
Why do you think no database changes are required ? IMO figuring out how to use the DB such that you can isolate the I/O per candidate in the worker is key to be able to fully parallelise the entirety of the the pipeline from 1 -> 6 .
approval-voting - verify VRF assignment and approval signature.
I am not sure how you can do 3 as you would need to have to read CandidateEntry that you write in approval voting.
The crypto checks need just data that is available at the beginning of the block and you could provide that easily to the workers, you don't need the full CandidateEntry just some data from it like the candidate_hash, core allocation and vrf story.
approval-distribution - sanity check that we should have received this message from peer(assignment is not duplicate, assignment is for known block, approval is received only if we received assignment, etc.). approval-voting - sanity checks again the message and loads state from the database It would be great if you could detail what you mean. Is it just a candidate existing in the db ?
It would be great if you could detail what you mean. Is it just a candidate existing in the db ?
Yes, exactly.
Wouldn't this approach work better if we just had one single subsystem and this pool part of it. I mean, it's less messages to pass around and easier to debug, and enables very easy batching to further optimize signature checks.
Yes that would work as well, I just think it is not necessary to extract the peformance, as far as I can tell our subsystems queues are pretty fast at moving things around and we would still need two workers for S2 and S3 streams(see bellow)
Sanity check the message is not duplicate and that approvals are received after the assignment. Check VRF assignment is correct. Check we received all assignments for the approval vote.
Doesn't this introduce a race condition vs the approval-voting subsystem that writes DB ?
It is not worse than the current setup, no approval vote gets passed to the approval-voting if approval-distribution does not have a record that it already received a valid assignment for it and since both vote and assignment lands on the same worker, they have everything in the worker context to validate it.
Once the message has been properly validated by a ValidationPoolWorker it is forwarded in parallel to both approval-voting and approval-distribution, so that they both can perform the remaining operations in parallel with the assumption that the message is now valid approval-distribution will do 5 & 6 and approval-voting will do 2 & 4.
I guess we could avoid doing 2 if we merge the 2 subsystems.
The operations done by approval-voting(database recording) and by approval-distribution(determine the peers that should receive the message) can be performed in parallel, so with 2 subsystem you already get the necessary parallelism if we merge, then we would have to have two separate workers, which is actually the subsystems gives us for free.
Why do you think no database changes are required ? IMO figuring out how to use the DB such that you can isolate the I/O per candidate in the worker is key to be able to fully parallelise the entirety of the the pipeline from 1 -> 6 .
With these proposal we don't need to isolate the IO because we get the necessary parallelisation and pipelining by having 3 working streams that can advance in parallel:
S1: ValidationPool workers validating in parallel that messages satisfy all crypto requirements. S2: Approval-voting in a single main loop just takes the messages one by one and records them into the database and the rest of the work that it does currently to concur if it needs to trigger a new assignment or if a block is approved. S3 Approval-distribution determining what peers need to receive the message and sending it to them.
Also, with this split it is actually S3 that consumes slightly more cpu than S2, because it turns out iterating for every message through 1000 validators adds up to a lot of usage.
Another picture showing how the processing would progress above.
Thought a bit about it and I see no reason why we can do it even better, by allowing each worker to also run the logic for deciding which peers should receive a given message and queue it to the network bridge tx something like this.
Impressive @alexggh !
So basically distribution is happening in the workers then (last picture)? (Also book keeping on who has which message and such?)
Do we understand where the speedup is coming from? We do no longer wait for e.g. crypto checks to report back before starting processing the next message anymore - correct? I guess this is the major reason for the speedup? Do we see improvements with only one worker?
We had an off-line discussion about this new proposal and the conclusion was to go ahead with full PoC and more testing followed up by production implementation.
Key takeaways:
So basically distribution is happening in the workers then (last picture)? (Also book keeping on who has which message and such?)
Yes, that's what it should happen, I don't have PoC for the last picture yet, but it is coming.
Do we understand where the speedup is coming from? We do no longer wait for e.g. crypto checks to report back before starting processing the next message anymore - correct? I guess this is the major reason for the speedup?
The speedup comes from two directions:
Do we see improvements with only one worker?
Yes, we see improvements with only one worker, although it is not enough to not fall slightly
behind when all assignments and all approvals gets triggered(130_000 unique messages per block), 2 workers seems to be fine on my machine.
approval-distribution - sanity check that we should have received this message from peer(assignment is not duplicate, assignment is for known block, approval is received only if we received assignment, etc.).
In this, approval-distribution workers need more read access to the approval-voting database now, right?
there will be additional memory required per workers as they track the relay blocks and candidate hashes themselves , but it is very little information
As those tasks are just reading, you could prepare what all workers need once after the relay chain updates, wrap it inside an Arc<StuffForWorker>
, and send this to the workers, who then drop their old one. That's probably a minor thing, dependupon upon how muc hthey have to sift through.
approval-distribution - sanity check that we should have received this message from peer(assignment is not duplicate, assignment is for known block, approval is received only if we received assignment, etc.).
In this, approval-distribution workers need more read access to the approval-voting database now, right?
No, approval-distribution does not have access to the approval-voting database, in both the current implementation and the parallelised approval-distribution just passes sanitized message to the approval voting via channels. The difference with this optimisations is that approval-distribution won't have to wait after the approval-voting anymore because it already confirmed the messages are valid.
This should give you a pretty good idea of all that is needed to make it work https://github.com/paritytech/polkadot-sdk/pull/484
You maybe misstyped that issue number, but..
approval is received only if we received assignment
We'd need the assignments information for that to hold, no?
We do not gossip on approval votes messages for non-existant assigments, right?
Any idea what our current behavior assumes about assignments being recieved before approval votes? It's hopefully fine if they're dropped, but maybe problematic, so what we're doing now gives some evidence.
You maybe misstyped that issue number, but..
Sorry about that, this is the main issue: https://github.com/paritytech/polkadot-sdk/pull/4849
approval is received only if we received assignment
We'd need the assignments information for that to hold, no?
Yes, and the approval-distribution already has that and enforces it.
We do not gossip on approval votes messages for non-existant assigments, right? Any idea what our current behavior assumes about assignments being recieved before approval votes? It's hopefully fine if they're dropped, but maybe problematic, so what we're doing now gives some evidence.
The current behaviour assumes that approvals follow the same path as assignments, because nodes send the approvals to all nodes that they send the assignments to, so if an approval is received before receiving the assignment for it it is considered miss-behaviour and we slightly reduce the reputation of the node https://github.com/paritytech/polkadot-sdk/blob/master/polkadot/node/network/approval-distribution/src/lib.rs#L1540C58-L1540C81.
nodes send the approvals to all nodes that they send the assignments to
This seems unlikely given our four random sends in the gossip protocol. It's possible we'd break this in future too. Anyways, I'm not sure this matters since the approval-vote should come well after the assignment.
if an approval is received before receiving the assignment for it it is considered miss-behaviour and we slightly reduce the reputation of the node
Alright. Also, approval-voting does not count the approval vote of course, and the vote message gets droped presumably.
approval is received only if we received assignment We'd need the assignments information for that to hold, no? Yes, and the approval-distribution already has that and enforces it.
Alright fine, so you track this in two places. That's maybe for the best: We likely need approval-distribution to keep running for a little while after approval-voting finishes because we do not know if all peers have marked the candidate approved yet. Ergo the extra information on approval-voting knows would be a distraction in approval-distribution.
nodes send the approvals to all nodes that they send the assignments to
This seems unlikely given our four random sends in the gossip protocol. It's possible we'd break this in future too. Anyways, I'm not sure this matters since the approval-vote should come well after the assignment
We do keep track of the four random we send the assignment to, and when we receive the vote we send it to them as well.
Alright. Also, approval-voting does not count the approval vote of course, and the vote message gets droped presumably.
Yes.
Alright fine, so you track this in two places. That's maybe for the best: We likely need approval-distribution to keep running for a little while after approval-voting finishes because we do not know if all peers have marked the candidate approved yet. Ergo the extra information on approval-voting knows would be a distraction in approval-distribution.
Yes, that's how it was done from the beginning the optimisation proposed here and reflected in the series https://github.com/paritytech/polkadot-sdk/pull/4849, just make better use of that, so we can better use the available hw capacity.
Implementation done with: https://github.com/paritytech/polkadot-sdk/pull/4849
The current implementation splits the node approval work in two distinct subsystems, one responsible for implementing the networking functionality: spam protection and gossip while the second one implements the logic of the approval voting protocol: assignment, approval vote checking and parachain block approval book keeping and parablock finality consensus. Both of these subsystems are designed as a single thread that manipulate a global state object.
The message processing pipeline is very simple:
approval-distribution
and goes through an initial spam filter that ensures that each assignment/approval passes only onceapproval-voting
that checks and imports the assignment certificate or approval vote signaturesapproval-distribution
blocks and waits for the result of such checks.The problem
The ToF of each queued approval-distribution message is equal to the sum of the processing time of all the already queued messages at sending time. In our case, this means that the maximum throughput of approval-distribution is a function of the time it takes to process a single message.In practice, the tranche0 VRF modulo assignments are configured to provide on average the required number of votes (
needed_approvals
) for approving a candidate. As a result, on each relay chain block which includes candidates the network generates an initial burst of assignments that scales withn_cores
andn_validators
. Approvals might be less bursty as the time to recover and check candidates depends on more variables. But, this is fine, as long as the average message processing rate is higher than the rate of assignment/approvals being sent by the network to a node.Profiling - https://github.com/paritytech/polkadot-sdk/issues/732#issuecomment-1691884488 shows where we spend most CPU time, and areas of optimisation that we already explored.
Failed Versi load testing
All attempts, including with a new pr for batching assignment checks have failed at 300 prachain validators due to
approval-distribution/voting
not keeping up with amount of messages.For 300 validators and 50 parachains to run nicely with 6 second block times we have the requirement of producing and checking at least
need_approvals=30
per candidate, leading to a total minimum number of 1500+1500 unique messages(assignments + approvals) to approve all parachain blocks included on a relay chain block. On average this means, approval voting needs to be able to check and import 250 assignments + 250 approvals per second. Approval distribution on the other hand needs to deal with around 2.5x times more messages, due to the gossip topology duplication of messages (assuming normal-operation conditions).We expect that v2 assignments and approval coalescing will reduce the average cost of processing approvals and assignments by a factor of 3 at least and allow us to go beyod the current limit of 200 validators with and 50 approval cores. In not so perfect network conditions, for example when operators upgrade their nodes, the network can experience at times slower availability recovery and no-shows which will trigger more tranches of approval checkers. Approval vote coalescing will provide little benefits at that point leading to unstable block times and high finality lag.
Failure modes
Nodes of an overloaded network ping pong between (1) and (2) due to a feedback loop which is triggered at very high ToF (10s). Approvals are processed slowly and nodes detect no shows and trigger additional tranches of assignments increasing the load of the system until (1) happens which halves block production rate on each missed slot, leading to less work for approval voting, which breaks the feedback loop.
Future proof solution
A long term solution needs to solve the problem by delivering on the following:
candidate-validation
for example), can choose to not back a candidate due to this backpressure, reducing the load of the network.approval-distribution
subsystem maximum ToF is < 6s at maximum capacity.Approval voting and distribution in a single subsystem
The new subsystem is broken down into one component per one responsibility:
These components will be run in separate async tasks:
Main subsystem loop
The main loops should be roughly an equivalent to the loops we have now in both subsystems: handling of
ApprovalDistributionMessage
andApprovalVotingMessage
. It needs to forward assignments (including own assignments) and approvals to the worker pool and handles active leaves updates, finality and pruning of db.It also needs a read only view of the DB, to answer
GetApprovalSignaturesForCandidate
orApprovedAncestor
messages.The worker pool
The pool exposes an interface for assigning specific candidates, assignments and approvals to be processed by the approval voting workers.
We pick the simplest way to assign work, further improvements can be made later (multiple producers and consumer, work stealing). The pool will maintain a candidate affinity mapping for each candidate and assign candidates to workers in a round robin fashion.
The API could look like this:
fn new_task(task: ApprovalTask)
fn prune(finalised, &[CandidateHash])
Approval tasks on a given candidate are sticky, meaning that once a worker has processed the first assignment for a candidate, it will process all the other messages. The pool guarantees that a candidate is always assigned once, to one single worker (1:1 mapping). This ensures the processing of assignments and approvals is done sequentially in the context of a given candidate wrt the order of receiving from the network stack.
Approval workers
Each worker can process up to a bounded number of candidates at any time via receiving new assignments and approvals over a bounded channel from the main loop worker pool instance. The exact number of candidates that are being worked on depends on backpressure on the backing new candidates across the network.
Each approval worker has the following responsibilities:
CandidateContext
from any new candidate task received from the main loop. The context contains a snapshot of the global persisted state for the given candidate:BlockEntry
,CandidateEntry
, assignments, approvals - per candidate state.WriteOps
to the database workerFor each new message, the worker will follow exactly the same processing pipeline as we do in the present.
Database worker
Represents the only place where we write changes to the parachains DB. Runs in a blocking thread. We only allow readers in all other tasks of the subsystem. Basically the worker just receives a stream of
BackendWriteOp
from approval workers that update the approval state of a specific candidate.