darkforestry / amms-rs

A Rust library to interact with automated market makers across EVM chains.
461 stars 125 forks source link

Feat: discorver factories async #229

Open molaco opened 3 weeks ago

molaco commented 3 weeks ago

Describe the feature you would like

Because discover factories was very slow and there was a async TODO I wrote some simple code that makes it 10x faster. I hope its useful.

Additional context

use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_provider::{
    network::primitives::BlockTransactions, network::Ethereum, Provider, ProviderBuilder,
    RootProvider,
};
use alloy_rpc_types::eth::Filter;
use alloy_sol_types::{sol, SolCall, SolEvent, SolValue};
use alloy_transport_http::Http;

// use indicatif::ProgressBar;
use reqwest::Client;

use revm::{
    primitives::{address, Address, Bytes, TxKind, B256, U256},
    Evm,
};
use revm_database::{AlloyDB, CacheDB, StateBuilder};
use revm_database_interface::WrapDatabaseAsync;
use revm_inspector::{inspector_handle_register, inspectors::TracerEip3155};
use revm_wiring::EthereumWiring;

use std::time::Instant;
use std::{collections::HashMap, sync::Arc};
use tokio::task;
use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Set up the HTTP transport which is consumed by the RPC client.
    let rpc_url = "".parse()?;

    // create ethers client and wrap it in Arc<M>
    let client = ProviderBuilder::new().on_http(rpc_url);

    // Params
    let chain_id: u64 = 1;
    let block_number = 10889447;
    // let block_number = 21106431;

    // Fetch the transaction-rich block
    let block = match client
        .get_block_by_number(BlockNumberOrTag::Number(block_number), true)
        .await
    {
        Ok(Some(block)) => block,
        Ok(None) => anyhow::bail!("Block not found"),
        Err(error) => anyhow::bail!("Error: {:?}", error),
    };
    println!("Fetched block number: {}", block.header.number);
    let previous_block_number = block_number - 1;

    // Discover UniswapV2 factories
    sol! {
        /// Interface of the UniswapV2Factory contract
        #[derive(Debug, PartialEq, Eq)]
        #[sol(rpc)]
        contract IUniswapV2Factory {
            event PairCreated(address indexed token0, address indexed token1, address pair, uint256 index);
            function getPair(address tokenA, address tokenB) external view returns (address pair);
            function allPairs(uint256 index) external view returns (address pair);
            function allPairsLength() external view returns (uint256 length);
        }
    }

    let pair_created_signature = IUniswapV2Factory::PairCreated::SIGNATURE_HASH;

    let block_filter = Filter::new().event_signature(pair_created_signature);

    let mut from_block = 0;
    let block_step = 50000;

    let mut block_num_vec: Vec<(u64, u64)> = Vec::new();
    let mut identified_factories: HashMap<Address, u64> = HashMap::new();

    let total_start = Instant::now();

    while from_block < block_number {
        // Get pair created event logs within the block range
        let mut target_block = from_block + block_step - 1;
        if target_block > block_number {
            target_block = block_number;
        }

        block_num_vec.push((from_block, target_block));

        from_block += block_step;
    }

    let stream_start = Instant::now();

    let mut stream = futures::stream::iter(&block_num_vec).map(|&(from_block, target_block)| {
        let block_filter = block_filter.clone();
        let client = client.clone();
        task::spawn(async move {
            process_block_logs_batch(&from_block, &target_block, client, &block_filter).await
        })
    });

    let results_start = Instant::now();
    let results_duration = results_start.elapsed();

    let results = stream.collect::<Vec<_>>().await;

    for result in results {
        match result.await {
            Ok(Ok(local_identified_factories)) => {
                for (addrs, count) in local_identified_factories {
                    *identified_factories.entry(addrs).or_insert(0) += count;
                }
            }
            Ok(Err(err)) => {
                // The task ran successfully, but there was an error in the Result.
                eprintln!("Error occurred: {:?}", err);
            }
            Err(join_err) => {
                // The task itself failed (possibly panicked).
                eprintln!("Task join error: {:?}", join_err);
            }
        }
    }

    // Print the found factories
    for (key, value) in &identified_factories {
        println!("{}: {}", key, value);
    }

    let stream_duration = stream_start.elapsed();
    let total_duration = total_start.elapsed();
    println!("Stream processing took: {:?}", stream_duration);
    println!("Results collection took: {:?}", results_duration);
    println!("Total collection took: {:?}", total_duration);

    Ok(())
}

async fn process_block_logs_batch(
    from_block: &u64,
    target_block: &u64,
    client: RootProvider<Http<Client>>,
    block_filter: &Filter,
) -> anyhow::Result<HashMap<Address, u64>> {
    let block_filter = block_filter.clone();
    let mut local_identified_factories: HashMap<Address, u64> = HashMap::new();

    let logs = client
        .get_logs(&block_filter.from_block(*from_block).to_block(*target_block))
        .await?;

    for log in logs {
        // tracing::trace!("found matching event at factory {}", log.address());

        if let Some(amms_length) = local_identified_factories.get_mut(&log.address()) {
            *amms_length += 1;
        } else {
            local_identified_factories.insert(log.address(), 0);
        }
    }

    Ok(local_identified_factories)
}
molaco commented 3 weeks ago

230