graphql-rust / graphql-client

Typed, correct GraphQL requests and responses in Rust
Apache License 2.0
1.14k stars 155 forks source link

Support subscriptions in graphql_client_web #204

Open tomhoule opened 5 years ago

tomhoule commented 5 years ago

We should probably look at how this is implemented in other GraphQL clients that work in browsers, Apollo for example.

I haven't worked with subscriptions much personally, so I will have to read up on this so I can implement it or give feedback on someone else's implementation. This issue is for discussing the requirements for the feature.

Sytten commented 2 years ago

I happen to be in need of that, so maybe I could give it a try. Right now there are two specs, the legacy apollo one and the newer graphql-ws one. This is usually implemented using websocket, but sometimes server sent events. Sadly reqwest still doesn't support websocket (https://github.com/seanmonstar/reqwest/issues/864), so we probably need another library. Maybe https://github.com/snapview/tokio-tungstenite?

Sytten commented 2 years ago

For those interested, I have a working prototype over here https://github.com/caido/graphql-ws-client. In theory the source crate is runtime agnostic so it should be possible to run it everywhere but I didn't try it. Depending on the response from https://github.com/obmarg/graphql-ws-client/issues/30, I will either push the prototype upstream or continue working on my work (maybe we can integrate here later).

Sytten commented 2 years ago

I polished my prototype and it is now merged in v0.2.0 or https://github.com/obmarg/graphql-ws-client!

As a reference here own I implemented it for my tests:

pub struct TokioSpawner(tokio::runtime::Handle);

impl TokioSpawner {
    pub fn new(handle: tokio::runtime::Handle) -> Self {
        TokioSpawner(handle)
    }

    pub fn current() -> Self {
        TokioSpawner::new(tokio::runtime::Handle::current())
    }
}

impl futures::task::Spawn for TokioSpawner {
    fn spawn_obj(
        &self,
        obj: futures::task::FutureObj<'static, ()>,
    ) -> Result<(), futures::task::SpawnError> {
        self.0.spawn(obj);
        Ok(())
    }
}
pub type GraphQLSubscriptionsClient = GraphQLClientClient<Message>;
pub type GraphQLSubscription<Q> = SubscriptionStream<GraphQLClient, StreamingOperation<Q>>;

pub async fn build_subscriptions_client() -> GraphQLSubscriptionsClient {
    let mut request = "ws://locahost:8080/ws/graphql"
        .into_client_request()
        .unwrap();
    request.headers_mut().insert(
        "Sec-WebSocket-Protocol",
        HeaderValue::from_str("graphql-transport-ws").unwrap(),
    );

    let (connection, _) = async_tungstenite::tokio::connect_async(request)
        .await
        .unwrap();
    let (sink, stream) = connection.split();

    GraphQLClientClientBuilder::new()
        .build(stream, sink, TokioSpawner::current())
        .await
        .unwrap()
}

pub async fn subscribe<Q>(
    client: &mut GraphQLSubscriptionsClient,
    variables: Q::Variables,
) -> GraphQLSubscription<Q>
where
    Q: GraphQLQuery + Send + Unpin + 'static,
    Q::Variables: Send + Unpin,
{
    let operation = StreamingOperation::<Q>::new(variables);
    client.streaming_operation(operation).await.unwrap()
}

pub async fn next<Q>(stream: &mut GraphQLSubscription<Q>) -> Response<Q::ResponseData>
where
    Q: GraphQLQuery + Send + Unpin + 'static,
    Q::Variables: Send + Unpin,
{
    timeout(Duration::from_secs(5), stream.next())
        .await
        .unwrap()
        .unwrap()
        .unwrap()
}
#[tokio::test]
async fn test_replay_task() {
        let mut client = common::build_subscriptions_client().await;

        let variables = updated_book::Variables {};
        let mut stream = common::subscribe::<subscriptions::UpdatedBook>(&mut client, variables).await;

        let variables = update_book::Variables {
            id: "1".to_string(),
            input: UpdateBookInput {
              name: "Test".to_string(),
            }
        };
        let res = post_graphql::<mutations::UpdateBook>(variables).await;
        let book = res.data.unwrap().update_book.book;

        let res = common::next(&mut stream).await;
        let updated_book = res.data.unwrap().updated_book;

        assert_eq!(book.id, updated_book.id);
}