informalsystems / tendermint-rs

Client libraries for Tendermint/CometBFT in Rust!
Apache License 2.0
587 stars 213 forks source link

WebSocket subscription query not found #1426

Open patternmachine opened 1 month ago

patternmachine commented 1 month ago

What went wrong?

When subscribing to NewBlock events (possibly also others, which I have not tested) I expect my code to receive events via the Subscription returned by the subscribe() call which is not the case.

After spending way too much time going through the code I discovered that the SubscriptionRouter stores the subscriptions internally in a HashMap<SubscriptionQuery, HashMap<SubscriptionId, SubscriptionTx>>, where SubscriptionQuery is just a type alias for a String. For the NewBlock event that then means the key ends up being "tm.event = 'NewBlock'". Now when the client receives a message, it takes the query field inside that message to look into the hashmap. Unfortunately, I keep getting back "tm.event='NewBlock'" which is obviously not the same as "tm.event = 'NewBlock'" (missing spaces around the =).

Steps to reproduce

Take the example code from the WebSocketClient

use tendermint_rpc::{WebSocketClient, SubscriptionClient, Client};
use tendermint_rpc::query::EventType;
use futures::StreamExt;

#[tokio::main]
async fn main() {
    let (client, driver) = WebSocketClient::new("wss://osmosis-rpc.publicnode.com:443/websocket")
        .await
        .unwrap();
    let driver_handle = tokio::spawn(async move { driver.run().await });

    // Subscription functionality
    let mut subs = client.subscribe(EventType::NewBlock.into())
        .await
        .unwrap();

    // Grab 5 NewBlock events
    let mut ev_count = 5_i32;

    while let Some(res) = subs.next().await {
        let ev = res.unwrap();
        println!("Got event: {:?}", ev);
        ev_count -= 1;
        if ev_count < 0 {
            break;
        }
    }

    // Signal to the driver to terminate.
    client.close().unwrap();
    // Await the driver's termination to ensure proper connection closure.
    let _ = driver_handle.await.unwrap();
}

Definition of "done"

The above should print out the events.

romac commented 1 month ago

Thanks for taking the time to figure this out, and sorry you had to run into this!

We should definitely index the subscriptions by the corresponding parsed and normalized Query instead of its string representation.

Would you be open to create a PR for this? Otherwise I am happy to do it myself but it'll have to wait until next week or the one after that.

romac commented 1 month ago

@patternmachine I just put a WIP branch with a tentative fix, let me know if that works for you until I have time to get back to it, or feel free to improve on it and open a PR.

Note that I haven't had a chance to test this so it may very well be broken in more than one way.