0xPolygonMiden / miden-node

Reference implementation of the node for the Polygon Miden rollup
MIT License
52 stars 36 forks source link

Batch producer design and features #191

Open hackaugusto opened 7 months ago

hackaugusto commented 7 months ago

The batch builder is currently two pieces:

  1. The trait BatchBuilder
  2. The DefaultBatchBuilder implementation

The trait has a single method:

async fn build_batch(&self, txs: Vec<SharedProvenTx>) -> Result<(), BuildBatchError>;

Which is called by TransactionQueue. The current implementation pushes transactions batches on regular intervals.

Things to discuss and improve on the current design:

hackaugusto commented 7 months ago

As mentioned above, batch proving will be implemented with a cluster of machines. Currently it is unclear if BatchBuilder is an abstraction over the cluster, or a single machine in said cluster.

IMO it makes sense to abstract over the strategy, local/distributed, instead of a prover instance, a single prover, because the later would need extra code (and maybe even another trait).

If that is the case, then IMO the return type should have Future, so that the distributed case can resolve it to the result and report errors. (Currently it seems this is implemented with a new task )

hackaugusto commented 7 months ago

Back pressure from the batch builder. Currently the tx queue pushes batches to the builder on regular intervals without back pressure.

If we agree on using the Future as mentioned above, backpressure can be implemented by wrapping the future in an option, the type would be:

async fn build_batch(&self, txs: Vec<SharedProvenTx>) -> Option<Future<Result<(), BuildBatchError>>>;
hackaugusto commented 7 months ago

The last thing, is the dependency graph. At the moment the graph is:

graph

   API -->|add_transaction| TransactionQueue;
   TransactionQueue -->|verify_tx| StateView;
   TransactionQueue -->|build_batch| BatchBuilder;
   BatchBuilder -->|build_block| BlockBuilder;
   BlockBuilder -->|apply_block| StateView;

IOW, roughly like a pipeline. (Note: The control flow is a bit more complicated ref ).

This pipeline-like design, has a few consequences:

  1. The error handling code is on the TransactionQueue, but the successful execution takes the tx and pushes it to the block builder in another task. This makes following the control flow harder, since there are multiple concurrent tasks and queues to keep track of.
  2. It means no one of the components in the stack can keep track of what is in-flight, and what is committed, the consequence is that the state view has to hook into multiple places to track in-flight state.

IMO it should instead be a design like the following:

#[async_trait]
pub trait BatchBuilder {
    async fn build_batch(
        &self,
        txs: Vec<SharedProvenTx>,
    ) -> Option<Future<Result<TransactionBatch, BuildBatchError>>>;
}

#[async_trait]
pub trait BlockBuilder {
    async fn build_block(
        &self,
        batches: Vec<TransactionBatch>,
    ) -> Option<Future<Result<Block, BuildBlockError>>>;
}

That allows the TransactionQueue to take the responsibility of the StateView, and hopefully reduce the number of tasks (not sure about this yet, but it is one of my goals, to reduce synchronization and test flakiness). The simplified control flow would then be:

sequenceDiagram
    actor Network
    participant TxQueue
    participant BatchBuilder
    participant BlockBuilder
    participant Store

    Note over Network,Store: New tx
    Network->>TxQueue: add_transaction
    activate TxQueue
    TxQueue->>TxQueue: verify_transaction
    alt is_valid
    TxQueue->>TxQueue: add_tx
    end
    alt has_txs_for_batch
    TxQueue--)BatchBuilder: build_batch
    end
    deactivate TxQueue

    Note over Network,Store: New batch
    BatchBuilder--)TxQueue: batch_done
    activate TxQueue
    TxQueue->>TxQueue: add_batch
    alt has_batches_for_block
    TxQueue--)BlockBuilder: build_block
    end
    deactivate TxQueue

    Note over Network,Store: New block
    BlockBuilder--)TxQueue: block_done
    activate TxQueue
    TxQueue->>Store: apply_block
    TxQueue->>TxQueue: update_inflight
    deactivate TxQueue

    Note over Network,Store: Periodic
    activate TxQueue
    TxQueue->>TxQueue: tick
    loop for each tx group
    TxQueue--)BatchBuilder: build_batch
    end
    alt has_batches
    TxQueue--)BlockBuilder: build_block
    end
    deactivate TxQueue

Note: The workflow above, were build_block is called as soon as there is enough batches for a block, means that block times are not regular. The trade-off here is higher throughput and lower latency, for a irregular block time. I think it is a good trade-off, and we can use the block's timestamp to base a time signature (however, this has implications on some other parts of the protocol, like P2IDR, which should be re-evaluated).

Note2: The workflow above is simplified, as it does not account for back pressure handling, but it gives the overall flow of the data in the system, the back pressure should be pretty easy to implement on top of the above.

bobbinth commented 7 months ago

The trait has a single method:

async fn build_block(&self, batches: Vec<SharedTxBatch>) -> Result<(), BuildBlockError>;

Which is called by TransactionQueue.

This is not accurate (transaction queue pushes transaction batches into the batch builder, and the batch builder pushes batches into the block producer), but since the diagram in https://github.com/0xPolygonMiden/miden-node/issues/191#issuecomment-1911855544 is correct, I'm assuming this is a typo?

IMO it should instead be a design like the following:

async fn build_block(
    &self,
    batches: Vec<SharedTxBatch>,
) -> Option<Future<Result<TransactionBatch, BuildBlockError>>>;

I'm not quite following this part:

Basically, was pass a set of batches into build_block(), unless there is an error, we should get a block back, and the Feature part is handled organically by async function.

That allows the TransactionQueue to take the responsibility of the StateView

This approach could work and we actually considered it initially (basically to have a single component which manages the whole state with several tasks interacting with it). The current approach tries to follow an "actor model" where there are multiple components each responsible for maintaining their own state. I am not against re-evaluating this though, especially if it leads to a simpler design.

One other general note: we should keep in mind that we are building towards a fully decentralized system (even if it takes us some time to get there). This means that we shouldn't assume that there is a single block builder or even that the block builder will be known ahead of time.

This doesn't have a ton of implications for now, but there are a few. The main one relevant here is that block production times should be relatively regular. Currently, we are aiming 3-second block times (though this can change).

hackaugusto commented 7 months ago

This is not accurate (transaction queue pushes transaction batches into the batch builder, and the batch builder pushes batches into the block producer), but since the diagram in https://github.com/0xPolygonMiden/miden-node/issues/191#issuecomment-1911855544 is correct, I'm assuming this is a typo?

Ah, yes, I meant to write the issue about the batch builder, as in the issue title. Not sure why I pasted the links to the block producer over there.

Why are we returning a TransactionBatch from build_block()?

Because of the typo, it should have been the batch builder.

Assuming the above is a typo (i.e., TransactionBatch should be Block), why do we return Feature within an Option?

It is an Option of a Future.

The Option would signal the batch builder has enough work, and the transaction queue should wait before pushing extra work. This implements back-pressure.

The Future is working like a Promise from the JS world, basically an object that will be resolved once there is a value. It represents the fact that batch proving is asynchronous. Returning a Future should remove the need for spawning a task (maybe), and it signals to the caller that it will receive the results back.

This approach could work and we actually considered it initially (basically to have a single component which manages the whole state with several tasks interacting with it). The current approach tries to follow an "actor model" where there are multiple components each responsible for maintaining their own state. I am not against re-evaluating this though, especially if it leads to a simpler design.

I don't know all the theoretical baggage of the actor model framework. But my main takeaway is the lack of locking primitives for synchronization, which I'm not proposing to use here, so in that regard it should have no downside (as in, this also doesn't deadlock).

Are there any other benefits we are looking for?

bobbinth commented 7 months ago

The Option would signal the batch builder has enough work, and the transaction queue should wait before pushing extra work. This implements back-pressure.

Could we not accomplish this by adding something like BuildBatchError::Busy variant? Basically, instead of wrapping retuning None in case when the batch builder is busy, we'd return Busy error.

A couple of general comments about Tx queue and batch builder:

The way they are set up currently is very primitive - this was basically implemented to make them work in the simplest possible way. So, we should view these as skeletons of what needs to be there. Here is a rough sketch of the flow we'd like to achieve eventually:

As transactions are submitted, they are added to the transaction queue. The only checks we make here is that transactions should be valid. I think this part works fine.

Assuming the batch builder is not busy, we want to submit transactions to it for batching (if it is busy, ideally, we won't even send a request to batch at all). Here we have 2 considerations:

The batch selection process right now is very primitive: periodically, we grab transactions from the queue, split them up into equal groups, and send them off to the batch builder. We should encapsulate this logic so that it is easy to update without affecting the rest of the system. For example, we could have something like this on the transaction queue:

/// Extracts groups of transactions from the current transaction queue. `ProposedBatch` could be
/// as simple as `Vec<ProvenTransaction>`.
///
/// `num_batches` could come from the batch producer indicating how much capacity it has.
pub fn select_batches(&self, num_batches: usize) -> Vec<ProposedBatch>

Current implementation of this function, would look something like this:

pub fn select_batches(&self, num_batches: usize) -> Vec<Vec<ProvenTransaction>> {
    let txs: Vec<ProvenTransaction> = {
        let mut locked_ready_queue = self.ready_queue.write().await;

        if locked_ready_queue.is_empty() {
            return Vec::new();
        }

        locked_ready_queue.drain(..).collect()
    };

    txs.chunks(self.options.batch_size).map(|txs| txs.to_vec()).to_vec()
}

The implementation of try_build_batches() could then look something like this:

async fn try_build_batches(&self) {
    let num_batches = self.batch_builder.get_capacity();
    if num_batches == 0 {
        return;
    }

    let proposed_batches = self.select_batches(num_batches);

    for proposed_batch in proposed_batches {
        let ready_queue = self.ready_queue.clone();
        let batch_builder = self.batch_builder.clone();

        tokio::spawn(
            async move {
                match batch_builder.build_batch(proposed_batch).await {
                    Ok(_) => {
                        // batch was successfully built, do nothing
                    },
                    Err(e) => {
                        // batch building failed, add txs back at the end of the queue
                        ready_queue.write().await.append(&mut e.into_transactions());
                    },
                }
            }
            .instrument(info_span!(target: COMPONENT, "batch_builder")),
        );
    }
}

Another point is that if we do want to have one central place to maintain global state (e.g., something like TransactionPool), we probably need to change how batch and block builder work. Currently, the build batches/blocks and process the results (e.g., build_batch and build_block do not return anything). But with this approach, the would need to return the result to be processed accordingly. For example, the try_build_batches function would change like so:

async fn try_build_batches(&self) {
    let num_batches = self.batch_builder.get_capacity();
    if num_batches == 0 {
        return;
    }

    let proposed_batches = self.select_batches(num_batches);

    for proposed_batch in proposed_batches {
        let tx_queue = self.tx_queue.clone();
        let batch_queue = self.batch_queue.clone();
        let batch_builder = self.batch_builder.clone();

        tokio::spawn(
            async move {
                match batch_builder.build_batch(proposed_batch).await {
                    Ok(batch) => {
                        // batch was successfully built, add the it to the batch queue
                        batch_queue.write().await.push(batch);
                    },
                    Err(e) => {
                        // batch building failed, add txs back at the end of the queue
                        tx_queue.write().await.append(&mut e.into_transactions());
                    },
                }
            }
            .instrument(info_span!(target: COMPONENT, "batch_builder")),
        );
    }
}
hackaugusto commented 7 months ago

Could we not accomplish this by adding something like BuildBatchError::Busy variant? Basically, instead of wrapping retuning None in case when the batch builder is busy, we'd return Busy error.

It does work. But IMO this is a more complicated API, for example, this adds the need for the get_capacity you mentioned above. My original idea was to isolate things that can be done synchronously from things that must be done async.

bobbinth commented 7 months ago

It does work. But IMO this is a more complicated API, for example, this adds the need for the get_capacity you mentioned above. My original idea was to isolate things that can be done synchronously from things that must be done async.

I probably still don't fully understand your proposal. Let's say we have the following method on the BatchBuilder:

async fn build_batch(
    &self,
    transactions: Vec<ProvenTransaction>,
) -> Option<Future<Result<TransactionBatch, BuildBatchError>>>

How would it be called from transaction queue/pool (i.e., how would try_build_batches() look like)?

hackaugusto commented 7 months ago

I probably still don't fully understand your proposal. How would it be called from transaction queue/pool look like?

The proposal is basically:

  1. txqueue should have handles to the in-flight batches (the Future above).
    1. Needed to propagate cancellations (I'm against fire-and-forget in production)
    2. It allows to push new work once capacity is available (= when a Future resolves).
  2. The back pressure should be in the same task.
    1. Reduces race conditions, because no time is needed to signal back pressure.

I did a few experiments and I no longer think that returning a task/Future is a good idea. Here is why:


Here is a concrete example. I think changing the type to JoinHandle<Result<Batch, Error>> like you suggested would be better indeed at the cost of some races with spawning/back pressure handling. The important piece of code is txqueue.

Simulation code ```rust use futures::stream::{FuturesUnordered, StreamExt}; use rand::distributions::{Distribution, Uniform}; use std::cmp::min; use std::ops::RangeInclusive; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, RwLock}; use tokio::task::JoinHandle; use tokio::time::{self, Instant}; const BATCH_SIZE: usize = 4; const BATCH_TIME: u64 = 4000; const BATCH_MAX: usize = 2; const TX_FREQUENCY: RangeInclusive = 700..=2000; const BATCH_FREQUENCY: RangeInclusive = 4000..=6500; type BatchBuilderResult = Result; /// Simulates a proven Transaction #[derive(Debug)] pub struct Tx { pub id: usize, } /// Simulates a proven batch #[derive(Debug)] pub struct Batch { pub id: usize, pub txs: Vec, } /// Simulates the batch builder #[derive(Debug)] struct BatchBuilder { batches: Arc>, id: usize, } /// Type to describe current backpressure state. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] enum Backpressure { Yes, No, } impl BatchBuilder { fn new() -> Self { Self { batches: Arc::new(RwLock::new(0)), id: 0, } } async fn build(&mut self, txs: Vec) -> Result, Vec> { assert!(!txs.is_empty()); // don't waste time on empty batches assert!(txs.len() <= BATCH_SIZE); { let mut batches = self.batches.write().await; // simulates all workers being busy if *batches >= BATCH_MAX { return Err(txs); } *batches += 1; } // helper code to simulate proving a batch let between = Uniform::from(BATCH_FREQUENCY); let mut rng = rand::thread_rng(); let time = Duration::from_millis(between.sample(&mut rng)); let id = self.id; let lock = self.batches.clone(); println!("Batches started {id}"); let handle = tokio::spawn(async move { tokio::time::sleep(time).await; let mut batches = lock.write().await; *batches -= 1; Ok(Batch { id, txs }) }); self.id += 1; Ok(handle) } } /// Try to create as many full batches as possible, until all transactions are consumed or /// a backpressure event. async fn try_full_batches( txpool: &mut Vec, batch_builder: &mut BatchBuilder, inflight: &mut FuturesUnordered>, ) -> Backpressure { while txpool.len() >= BATCH_SIZE { let batch: Vec<_> = txpool.drain(..BATCH_SIZE).collect(); match batch_builder.build(batch).await { Ok(handle) => inflight.push(handle), Err(batch) => { txpool.extend(batch); println!("Batch builder full"); return Backpressure::Yes; } } } Backpressure::No } /// Maybe start build a partial batch (one with less than BATCH_SIZE transactions). /// /// Only used for timer based batches, all other events require a full batch size. async fn try_partial_batch( txpool: &mut Vec, batch_builder: &mut BatchBuilder, inflight: &mut FuturesUnordered>, ) -> Backpressure { let size = min(txpool.len(), BATCH_SIZE); let batch: Vec<_> = txpool.drain(..size).collect(); match batch_builder.build(batch).await { Ok(handle) => inflight.push(handle), Err(batch) => { txpool.extend(batch); println!("Batch builder full"); return Backpressure::Yes; } } Backpressure::No } /// using a mpsc because it makes sense when adding a P2P layer async fn txqueue(mut txs: mpsc::Receiver) { let mut batch_builder = BatchBuilder::new(); let mut txpool = Vec::new(); let mut inflight = FuturesUnordered::new(); let mut backpressure = Backpressure::No; // timer used to start a new batch because of latency // the timer is reset on every try, regardless of their result let batch_time = Duration::from_millis(BATCH_TIME); let sleep = time::sleep(batch_time); tokio::pin!(sleep); loop { tokio::select! { // disable timer if: // - backpressure is enabled // - txpool is empty () = &mut sleep, if backpressure == Backpressure::No && !txpool.is_empty() => { println!("[TXQUEUE] TIME"); // set backpressure to handle cases the worker pool shrinked backpressure = try_partial_batch(&mut txpool, &mut batch_builder, &mut inflight).await; sleep.as_mut().reset(Instant::now() + batch_time); }, // never block receiving transactions tx = txs.recv() => { let tx = tx.expect("recv should not have failed"); println!("[TXQUEUE] {:?}", tx); txpool.push(tx); // if there is no backpressure for the time being, trigger a batch right away if backpressure == Backpressure::No { backpressure = try_full_batches(&mut txpool, &mut batch_builder, &mut inflight).await; sleep.as_mut().reset(Instant::now() + batch_time); } } // disable if: // - the FuturesUnordered is empty, since it resolves right away batch = inflight.next(), if !inflight.is_empty() => { let batch = batch.expect("task should not have failed").unwrap().unwrap(); println!("[TXQUEUE] {:?}", batch); backpressure = try_full_batches(&mut txpool, &mut batch_builder, &mut inflight).await; sleep.as_mut().reset(Instant::now() + batch_time); } } } } // helper to create fake txs to show the different behaviors async fn create_txs(txs: mpsc::Sender) { let between = Uniform::from(TX_FREQUENCY); let mut rng = rand::thread_rng(); let mut id = 0; loop { if id % 7 == 6 { // trigger time based tokio::time::sleep(Duration::from_millis(BATCH_TIME * 2)).await; } if id % 11 == 10 { // overload batch builder for _ in 0..(BATCH_SIZE * 2) { txs.send(Tx { id }).await.unwrap(); id += 1; } } else { // regular let time = Duration::from_millis(between.sample(&mut rng)); tokio::time::sleep(time).await; txs.send(Tx { id }).await.unwrap(); id += 1; } } } #[tokio::main] async fn main() { let (sender, receiver) = mpsc::channel(100); tokio::join!(txqueue(receiver), create_txs(sender)); } ```

This sample above will:

It shows how to collect all the handlers, but I have not looked into cancellation. The code is just a sample, it runs, it shows the concepts, but it is not prod code (so things like names, grouping of functions, or even the types was not really the focus).