ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
138 stars 56 forks source link

[Proposal] Dispatching to custom remote functions based on the message type #3670

Open Bhashinee opened 1 year ago

Bhashinee commented 1 year ago

Summary

Dispatching messages to custom remote functions based on the message type(declared by a field in the received message) with the end goal of generating meaningful Async APIs.

Goals

Generating meaningful AsyncAPIs and improving the readability of the code.

Motivation

With AsyncAPI gaining its's popularity with increased usage of event-driven microservices it is worthwhile to think of ways to generate AsyncAPI specifications using WebSocket service code. The motivation is to improve the service code to be more understandable to retrieve the maximum details to generate meaningful AsyncAPIs and to improve the readability of the code.

Description

In most real-world use cases, the WebSocket protocol will be used along with a sub-protocol. Most of the time those sub-protocols are differentiated from a dedicated field in the message and it contains the type of the message. For example: In Kraken API the type of the message is identified by the event field.

{"event": "ping"}
{"event": "subscribe",  "pair": [    "XBT/USD",    "XBT/EUR"  ],  "subscription": {    "name": "ticker"  }}
{"event": "heartbeat"}

Another example is GraphQL over WebSocket Protocol

The WebSocket sub-protocol for the above specification is: graphql-transport-ws. And the type of the message can be identified by the value of the field named type of the message.

{"type": "ping"}
{"type": "subscribe", "id":"1", "payload":{"query": "{ __schema { types { name } } }"}}

As of now, when using the Ballerina WebSocket service, all these messages are dispatched to the generic onMessage remote function. When the user writes a logic based on the received message, all have to be handled inside the onMessage using an if/else ladder or similar. This reduces the readability of the code.

And also, if we want to generate an AsyncAPI specification by referring to the service code, it is not possible to capture all the details like the response message for a particular type of message.

Ex:

Following is a part of the Kraken AsyncAPI specification describing the types of messages and their responses.

  messages:
    ping:
      summary: Ping server to determine whether connection is alive
      description: Client can ping server to determine whether connection is alive, server responds with pong. This is an application level ping as opposed to default ping in websockets standard which is server initiated
      payload:
        $ref: '#/components/schemas/ping'
      x-response:
        $ref: '#/components/messages/pong'

    unsubscribe:
      description: Unsubscribe, can specify a channelID or multiple currency pairs.
      payload:
        $ref: '#/components/schemas/subscribe'
      examples:
        - payload:
            event: unsubscribe
            pair:
              - XBT/EUR
              - XBT/USD
            subscription:
              name: ticker
        - payload:
            event: unsubscribe
            subscription:
              name: ownTrades
              token: WW91ciBhdXRoZW50aWNhdGlvbiB0b2tlbiBnb2VzIGhlcmUu
      x-response:
        $ref: '#/components/messages/subscriptionStatus'    

In the above AsyncAPI specification, it has the messages given as ping and unsubscribe. Their response messages are given by the field x-response.

If this part is written using existing WebSocket service functionalities, it would look like the following.

service class MyService {
    *websocket:Service;

    remote function onMessage(websocket:Caller caller, Ping|UnSubscribe message) returns Pong|SubscriptionStatus {
        if message is Ping {
            return {'type: WS_PONG};
        } else {
            return {'type: WS_UNSUBSCRIBE, id: "5"};
        }
    }
}

Therefore, if we have all the messages dispatched to a single onMessage remote function, it is difficult to differentiate the response for ping message and the response message for unsubscribe operation.

As a solution for this, the idea is to have custom remote functions based on the message type within the WebSocket service. For example, if the message is {"type": "ping"} it will get dispatched to onPing remote function. Similarly,

Message Remote function
{"event": "ping"} onPing
{"event": "subscribe", "pair": [ "XBT/USD", "XBT/EUR" ], "subscription": { "name": "ticker" }} onSubscribe
{"event": "heartbeat"} onHeartbeat

Dispatching rules

  1. The user can configure the field name(key) to identify the messages and the allowed values as message types.

The dispatcher is used to identify the event type of the incoming message by its value. The default value is 'type.

Ex: incoming message = {"event": "ping"} dispatcherKey = "event" event/message type = "ping" dispatching to remote function = "onPing"

@websocket:ServiceConfig {
    dispatcherKey: "event"
}
service / on new websocket:Listener(9090) {}
  1. Naming of the remote function.
  1. If an unmatching message type receives where a matching remote function is not implemented in the WebSocket service, it gets dispatched to the default onMessage remote function if it is implemented. Or else it will get ignored.

An example code for Kraken API with the proposed changes.

@websocket:ServiceConfig {
    dispatcherKey: "'type"
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}
shafreenAnfar commented 1 year ago

We can get it to work even without allowedValues right.

Lets make key's default value type.

shafreenAnfar commented 1 year ago

If we generate a client using the generated AsyncAPI how would it look like for the below?

@websocket:ServiceConfig {
    descriminator: {
        key: "event",
        allowedValues: ["ping", "heartbeat", "subscribe"]
    }
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}
shafreenAnfar commented 1 year ago

I prefer dispatcher over discriminator and if allowedValues are not needed, then maybe we can use dispatcherKey.

Bhashinee commented 1 year ago

We can get it to work even without allowedValues right.

Lets make key's default value type.

Updated the proposal by removing the discriminator record and adding the dispatcher as suggested.

Bhashinee commented 1 year ago

If we generate a client using the generated AsyncAPI how would it look like for the below?

@websocket:ServiceConfig {
    descriminator: {
        key: "event",
        allowedValues: ["ping", "heartbeat", "subscribe"]
    }
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }
}

How about a client code generated as follows?

public function main() {
    // Initiate an asyncapi WebSocket client by passing the server to connect. Individual clients will be generated for each `channel`.
    // This will connect to the given server (this client is generated for the `root` channel) and subscribe to that.
    // Name of the client will be `AsyncAPI title` + `<channel>` + `Client` ignoring the spaces, underscores etc. 
    // Note:- We might need to discuss and come up with a proper naming convention.
    KrakenWebSocketApiRootClient rootClient = new(KrakenWebSocketApiClient.Server_Public);

    // Publish messages to the server. The message types are retrieved from the response types of the `publish` 
    Ping pingMessage = {'type: PING});
    check rootClient->ping(pingMessage);

    Subscribe subscribeMessage = {'type: SUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"], "subscription": {"name": "ticker"}};
    check rootClient->subscribe(subscribeMessage);

    UnSubscribe unSubscribeMessage = {'type: UNSUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"]};
    check rootClient->unsubscribe(unSubscribeMessage);

    // Then the client can listen to the server publishing messages. The message types are retrieved from the response types 
    // `subscribe` operation.
    // Can be done in a separate strand in a loop.
    Pong|Heartbeat|SystemStatus|SubscriptionStatus message = check rootClient->listen();
}
shafreenAnfar commented 1 year ago

Looks good. Except we don't need listen() function. I was thinking of something like the below.

public function main() {
    // Initiate an asyncapi WebSocket client by passing the server to connect. Individual clients will be generated for each `channel`.
    // This will connect to the given server (this client is generated for the `root` channel) and subscribe to that.
    // Name of the client will be `AsyncAPI title` + `<channel>` + `Client` ignoring the spaces, underscores etc. 
    // Note:- We might need to discuss and come up with a proper naming convention.
    KrakenWebSocketApiRootClient rootClient = new(KrakenWebSocketApiClient.Server_Public);

    worker A {
       // Publish messages to the server. The message types are retrieved from the response types of the `publish` 
       while true {
          Ping pingMessage = {'type: PING});
          Pong pongMessage =  rootClient->ping(pingMessage);
       }
    }

    worker B  {
          Subscribe subscribeMessage = {'type: SUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"], "subscription": {"name": "ticker"}};
          stream<SubscriptionStatus> statusStream = check rootClient->subscribe(subscribeMessage);
          // loop stream
    }

    UnSubscribe unSubscribeMessage = {'type: UNSUBSCRIBE, "pair": ["XBT/USD", "XBT/EUR"]};
    check rootClient->unsubscribe(unSubscribeMessage);
}
shafreenAnfar commented 1 year ago

Reopened the proposal as it does not discuss about error handling of each custom remote method.

At the moment we have onError for onMessage, similarly I think we need onXXXError for each custom remote method. For example see the below code.

@websocket:ServiceConfig {
    dispatcherKey: "event"
}
service / on new websocket:Listener(9090) {
    resource function get .() returns websocket:Service {
        return new MyService();
    }
}

service class MyService {
    *websocket:Service;

    remote function onPing(Ping message) returns Pong {
        return {'type: WS_PONG};
    }

    remote function onPingError(error err) {
        return io:println(message);
    }

    remote function onSubscribe(Subscribe message) returns SubscriptionStatus {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onSubscribeError(error err) returns ErrorMessage {
        return {id: "4", 'type: WS_SUBSCRIBE};
    }

    remote function onHeartbeat(Hearbeat message) {
        io:println(message);
    }

    remote function onHeartbeatError(error err) {
        io:println(message);
    }
}
shafreenAnfar commented 1 year ago

In the case of there is no matching custom onXXXError method it should dispatch to onError. Just like in switch/case/default, which is the same we used for onMessage

shafreenAnfar commented 1 year ago

In addition to dispatchingKey, dispatchingStreamId is needed to allow multiplexing. If the same message is returned from two operations without the streamId complier plugin should capture it and give an error. Unlike dispatchingKey, dispatchingStreamId is only mandatory conditionally.

shafreenAnfar commented 1 year ago

dispatchingStreamId is always generated on the client side. Server side only does the correlation.