xmtp / xps-gateway

XPS (XMTP Postal Service) JSON-RPC for xmtp-specific interactions with decentralized networks
https://xmtp.github.io/xps-gateway/
MIT License
2 stars 1 forks source link

JSON RPC WebSocket API: `subscribeGroupMessages` #26

Open jac18281828 opened 9 months ago

jac18281828 commented 9 months ago

JSON RPC WebSocket API Documentation: subscribeGroupMessages

Overview

This document describes the JSON RPC WebSocket API for subscribing to a conversation. It provides a way for clients to receive real-time updates in a conversation thread.

WebSocket Endpoint

JSON RPC Protocol

This API follows the JSON RPC 2.0 specification. All messages sent and received are expected to be in valid JSON format.

Authentication

To establish a secure WebSocket connection. This api may be public and does not require authentication.

Method

1. subscribeGroupMessages

Subscribes the client to receive updates for a specific conversation.

2. see unsubscribeGroupMessages

Notifications

When subscribed to a conversation, the client will receive notifications in the following format:

Error Handling

The API uses standard JSON RPC error responses. Common error codes include:

Examples

Request:

{
  "jsonrpc": "2.0",
  "method": "subscribeGroupMessages",
  "params": {
    "conversationId": "0x1234abcd..."
  },
  "id": 1
}

Notification:

{
  "method": "groupMessageUpdate",
  "params": {
    "conversationId": "12345",
    "event": "PayloadSent"
    "messages": [
      {
        "blockNumber": "67890",
        "timestamp": "2024-01-05T12:00:00Z",
        "payload": "<bytes...>"
      }
    ]
  }
}

Possible Implementation

    let conversation_topic = [H256::from(conversation_id)];
    let contract_addr = SENDER_CONTRACT.parse::<Address>().unwrap();
    let filter = Filter::new()
        .from_block(U64::from(start_block.as_u64()))
        .event("PayloadSent(bytes32,bytes,uint256)")
        .address(vec![contract_addr])
        .topic1(conversation_topic.to_vec());

    let mut stream = self.client.subscribe_logs(&filter).await.unwrap();
    while let Some(log) = stream.next().await {
        if tracing::level_enabled!(tracing::Level::TRACE) {
            tracing::trace!("log: {:?}", log);
        }
        let param_result = abi_decode_payload_sent(log.data.to_vec());
        if let Ok(param) = param_result {
            tracing::debug!("param: {:?}", param);
            let message = param[0].clone().into_string().unwrap();
            tracing::trace!("message: {message}");
            callback(&message);
        } else {
            let err = param_result.unwrap_err();
            tracing::error!("param error: {:?}", err);
            return Err(err);
        }
    }
    Ok(())

Versioning and Updates

This document describes version 1.0 of the API. Future updates and changes will be communicated as necessary.

jac18281828 commented 9 months ago

Reference implementation: https://github.com/xmtp/xps-conversation-producer