ChainSafe / gossamer

🕸️ Go Implementation of the Polkadot Host
https://chainsafe.github.io/gossamer
GNU Lesser General Public License v3.0
427 stars 110 forks source link

Implement utility function DetermineNewBlocks #3933

Open edwardmack opened 2 months ago

edwardmack commented 2 months ago

Issue summary

Other information and links

edwardmack commented 1 month ago

Currently this function calls ChainApiMessage::Ancestors to build the list of ancestors. Still to implement loop that iterates ancestors and call ChainApiMessage::BlockHeader to confirm is block is new. To implement the functionality of this loop:

'outer: loop {
        let (last_hash, last_header) = ancestry
            .last()
            .expect("ancestry has length 1 at initialization and is only added to; qed");

        assert!(
            last_header.number > min_block_needed,
            "Loop invariant: the last block in ancestry is checked to be \
            above the minimum before the loop, and at the end of each iteration; \
            qed"
        );

        let (tx, rx) = oneshot::channel();

        // This is always non-zero as determined by the loop invariant
        // above.
        let ancestry_step =
            std::cmp::min(ANCESTRY_STEP, (last_header.number - min_block_needed) as usize);

        let batch_hashes = if ancestry_step == 1 {
            vec![last_header.parent_hash]
        } else {
            sender
                .send_message(
                    ChainApiMessage::Ancestors {
                        hash: *last_hash,
                        k: ancestry_step,
                        response_channel: tx,
                    }
                    .into(),
                )
                .await;

            // Continue past these errors.
            match rx.await {
                Err(_) | Ok(Err(_)) => break 'outer,
                Ok(Ok(ancestors)) => ancestors,
            }
        };

        let batch_headers = {
            let (batch_senders, batch_receivers) = (0..batch_hashes.len())
                .map(|_| oneshot::channel())
                .unzip::<_, _, Vec<_>, Vec<_>>();

            for (hash, batched_sender) in batch_hashes.iter().cloned().zip(batch_senders) {
                sender
                    .send_message(ChainApiMessage::BlockHeader(hash, batched_sender).into())
                    .await;
            }

            let mut requests = futures::stream::FuturesOrdered::new();
            batch_receivers
                .into_iter()
                .map(|rx| async move {
                    match rx.await {
                        Err(_) | Ok(Err(_)) => None,
                        Ok(Ok(h)) => h,
                    }
                })
                .for_each(|x| requests.push_back(x));

            let batch_headers: Vec<_> =
                requests.flat_map(|x: Option<Header>| stream::iter(x)).collect().await;

            // Any failed header fetch of the batch will yield a `None` result that will
            // be skipped. Any failure at this stage means we'll just ignore those blocks
            // as the chain DB has failed us.
            if batch_headers.len() != batch_hashes.len() {
                break 'outer
            }
            batch_headers
        };

        for (hash, header) in batch_hashes.into_iter().zip(batch_headers) {
            let is_known = is_known(&hash)?;

            let is_relevant = header.number >= min_block_needed;
            let is_terminating = header.number == min_block_needed;

            if is_known || !is_relevant {
                break 'outer
            }

            ancestry.push((hash, header));

            if is_terminating {
                break 'outer
            }
        }
    }