WilfredAlmeida / LightDAS

Lightweight DAS for Solana
GNU Affero General Public License v3.0
34 stars 6 forks source link

Tree is always Null #10

Open mindrunner opened 3 months ago

mindrunner commented 3 months ago

The subscription part pushes a new tx to the queuebut inprocess_logs`, the tree address is unknown and therefore set to Null. The queue processor then skips that entry due to the null value. This seems odd to me. Am I missing something?

I changed the setup, such that I have separate streams per Tree (don't know if that's a good idea at scale, tho). Works better for me this way. any thoughts on this?

pub async fn process_logs(logs_response: RpcLogsResponse, tree_address: String) {
    let transaction_signature = logs_response.signature;
    println!("websocket tx: {:?}", transaction_signature);

    push_back(TransactionsQueue {
        transaction_signature: transaction_signature.clone(),
        tree_address,
    });
}
    if args.subscribe {
        println!("Subscribing to tree addresses");
        println!("TREE ADDRESSES {:?}", tree_addresses);
        let pubsub_client = get_pubsub_client();

        let mut streams = SelectAll::new();

        for address in tree_addresses.iter().cloned() {
            let address_clone = address.clone();
            let subscription_result = pubsub_client.logs_subscribe(
                RpcTransactionLogsFilter::Mentions(vec![address.clone()]),
                RpcTransactionLogsConfig {
                    commitment: Some(CommitmentConfig::processed()),
                },
            ).await;

            match subscription_result {
                Ok(subscription) => {
                    let stream = subscription.0.map(move |log| (address_clone.clone(), log));
                    streams.push(Box::pin(stream) as Pin<Box<dyn Stream<Item=(String, Response<RpcLogsResponse>)> + Send>>);
                }
                Err(e) => {
                    eprintln!("error creating subscription for {}: {}", address, e);
                }
            }
        }

        let handle = tokio::spawn(handle_stream(streams));
        tasks.push(handle);
    }