alloy-rs / alloy

Transports, Middleware, and Networks for the Alloy project
https://alloy.rs
Apache License 2.0
656 stars 236 forks source link

[Bug] PubSubService does not subscribe fast enough to the ActiveSubscription Sender and drops first message #1601

Open tchardin opened 3 weeks ago

tchardin commented 3 weeks ago

Component

provider, pubsub

What version of Alloy are you on?

alloy-provider v0.5.4

Operating System

macOS (Apple Silicon)

Describe the bug

When registering a JSON RPC subscription with WebSocket, the pubsub frontend sends an async request to the service loop to subscribe to the broadcast channel, by the time the receiver is returned by the frontend, any message that was sent by the server in the meantime is missed.

To reproduce, simply run this test:

  async fn run_server() -> eyre::Result<std::net::SocketAddr> {
        use jsonrpsee::server::{RpcModule, Server, SubscriptionMessage};

        let server = Server::builder().build("127.0.0.1:0").await?;
        let mut module = RpcModule::new(());
        module
            .register_subscription(
                "subscribe_hello",
                "s_hello",
                "unsubscribe_hello",
                |_, pending, _, _| async move {
                    let sub = pending.accept().await.unwrap();

                    for i in 0..usize::MAX {
                        let msg = SubscriptionMessage::from_json(&i).unwrap();
                        sub.send(msg).await.unwrap();
                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
                    }

                    Ok(())
                },
            )
            .unwrap();
        let addr = server.local_addr()?;

        let handle = server.start(module);

        tokio::spawn(handle.stopped());

        Ok(addr)
    }

    #[tokio::test]
    async fn test_subscription() -> eyre::Result<()> {
        use alloy_provider::{Provider, ProviderBuilder};

        let addr = run_server().await?;

        let ws_provider = ProviderBuilder::new()
            .with_recommended_fillers()
            .on_builtin(&format!("ws://{}", addr).as_str())
            .await?;
        let mut request = ws_provider.client().request("subscribe_hello", ());
        // required if not eth_subscribe
        request.set_is_subscription();
        let sub_id = request.await?;
        // call the pubsub service to get a broadcast receiver.
        let mut sub = ws_provider.root().get_subscription(sub_id).await?;

        let num: usize = sub.recv().await.unwrap();
        assert_eq!(num, 0);

        Ok(())
    }

You will find the first message to be 1 which means the client missed the 0.

mattsse commented 3 weeks ago

yeah this setup is a bit flawed here,

the only solution I can think of rn if we replace the set_is_subscription bool with the actual channel