Open Bhashinee opened 2 years ago
Try to do something like this https://www.asyncapi.com/blog/websocket-part1
Objective Design Ballerina WebSocket module to comply with the Async API specification.
Design
Following is the mapping for the Async API and the Ballerina WebSocket service.
Application
Program containing a WebSocket service
Servers
Listeners
Channels
Upgrade service differentiated from resource paths
OperationId
Calling function
Subscribe
onOpen function
Publish
onMessage
Messages
Ballerina data types
Servers
Servers will be mapped to the Ballerina WebSocket listeners. Ex: Async API
servers:
public:
url: localhost:80
protocol: ws
description: |
Public server available without authorization.
Ballerina
listener websocket:Listener publicListener = new(80);
Channels
Channel is an addressable component made available by the server. Those will be mapped to the resource path of the WebSocket upgrade service. Ex: Async API
channels:
/:
publish:
description: Send messages to the API
operationId: processReceivedMessage
message:
oneOf:
- $ref: '#/components/messages/ping'
- $ref: '#/components/messages/subscribe'
- $ref: '#/components/messages/unsubscribe'
subscribe:
description: Messages that you receive from the API
operationId: sendMessage
message:
oneOf:
- $ref: '#/components/messages/pong'
- $ref: '#/components/messages/heartbeat'
- $ref: '#/components/messages/systemStatus'
- $ref: '#/components/messages/subscriptionStatus'
Ballerina API
service / on publicListener {
resource function get .() returns websocket:Service|websocket:Error {
return new WsService();
}
}
OperationId A unique string is used to identify the operation. This will be mapped to the calling function upon the request sent by the client.
Ex: Async API
publish:
description: Send messages to the API
operationId: processReceivedMessage
Ballerina API
service class WsService {
*websocket:Service;
remote function onMessage(websocket:Caller caller, Ping|Subscribe|Unsubscribe message) returns SubscriptionStatus|websocket:Error? {
return processReceivedMessage(message, caller);
}
}
Subscribe
Subscribe means users subscribe to the given channel to receive events published by the server. In this case, users are not sending anything to the server. Even if they send messages it is not required to read those messages. The client will make only the connection. So only the onOpen
function will get triggered. Inside the onOpen
function we will call the function generated by the name of operationId
given in the Async API.
Async API
subscribe:
description: Messages that you receive from the API
operationId: sendMessage
message:
oneOf:
- $ref: '#/components/messages/pong'
- $ref: '#/components/messages/heartbeat'
- $ref: '#/components/messages/systemStatus'
- $ref: '#/components/messages/subscriptionStatus'
Ballerina API
service class WsService {
*websocket:Service;
remote function onOpen(websocket:Caller caller) returns websocket:Error? {
future<()> _ = start sendMessage(caller);
}
}
Publish
Publish means users will publish messages to the server. When the client sends events to the server, onMessage
remote function will get invoked. Once it gets invoked similar to the subscribe
operation we will call the function generated by the name of operationId
given in the Async API.
Async API
publish:
description: Send messages to the API
operationId: processReceivedMessage
message:
oneOf:
- $ref: '#/components/messages/ping'
- $ref: '#/components/messages/subscribe'
- $ref: '#/components/messages/unsubscribe'
Ballerina API
service class WsService {
*websocket:Service;
remote function onMessage(websocket:Caller caller, Subscribe message) returns SubscriptionStatus|error? {
return processReceivedMessage(message, caller);
}
}
Response types are matched with the return types of the remote function.
Messages
Messages will be mapped to the Ballerina data types.
Async API
ping:
type: object
properties:
event:
type: string
const: ping
reqid:
$ref: '#/components/schemas/reqid'
required:
- event
reqid:
type: integer
description: client originated ID reflected in response message.
Ballerina API
public type Ping record {
string event;
Reqid reqid?;
};
public type Reqid int?;
An example of a generated Ballerina service for this Async API.
services.bal
import ballerina/websocket;
import ballerina/lang.runtime;
import ballerina/random;
listener websocket:Listener localListener = new (80);
const hasSubscription = "hasSubscription";
const connId = "ConnectionId";
service / on localListener {
resource function get .() returns websocket:Service|websocket:Error {
return new WsService();
}
}
service class WsService {
*websocket:Service;
remote function onOpen(websocket:Caller caller) returns error? {
int randomInteger = check random:createIntInRange(1, 100);
caller.setAttribute(connId, randomInteger);
future<(error?)> _ = start sendMessage(caller);
}
remote function onMessage(websocket:Caller caller, Subscribe|Unsubscribe message) returns SubscriptionStatus|error? {
return processReceivedMessage(message, caller);
}
remote function onPing(websocket:Caller caller, byte[] message) returns byte[] {
return message;
}
}
function sendMessage(websocket:Caller caller) returns error? {
SystemStatus systemStatus = {
connectionID: <int>check caller.getAttribute(connId),
event: "systemStatus",
'version: "1.0.0",
status: online
};
check caller->writeMessage(systemStatus);
check sendUpdates(caller);
}
function sendUpdates(websocket:Caller caller) returns error? {
Heartbeat hearbeat = {event: "heartbeat"};
while true {
check caller->writeMessage(hearbeat);
runtime:sleep(2);
if <boolean>check caller.getAttribute(hasSubscription) {
check caller->writeMessage(getUpdates());
} else {
break;
}
}
}
function getUpdates() returns string {
float a1 = random:createDecimal() + 20000;
float a2 = random:createDecimal();
float b1 = random:createDecimal() + 20000;
float b2 = random:createDecimal();
float c1 = random:createDecimal() + 11;
float c2 = random:createDecimal() + 2000;
string usd = string `[340,{\"a\":[\"${a1}\",0,\"${a2}\"],\"b\":[\"${b1}\",11,\"${b2}\"],\"c\":[\"${c1}\",\"${c2}\"],\"v\":[\"${a1}\",\"${a2}\"],\"p\":[\"${b1}\",\"${b2}\"],\"t\":[${c1}],\"l\":[\"${c2}\",\"${a1}\"],\"h\":[\"${a2}\",\"${b1}\"],\"o\":[\"${b2}\",\"${c1}\"]},\"ticker\",\"XBT/USD\"]`;
return usd;
}
function processReceivedMessage(Subscribe|Unsubscribe message, websocket:Caller caller) returns SubscriptionStatus|error? {
string event = <string>message?.event;
string[] pair = <string[]>message?.pair;
Subscribe subscrition = <Subscribe>message;
string? nameVal = subscrition?.subscription?.name;
Name name = <Name>nameVal;
if event is "subscribe" {
SubscriptionStatus subscriptionStatus = {
channelID: <int>check caller.getAttribute(connId),
channelName: name,
'event: "subscriptionStatus",
pair: pair,
status: subscribed,
subscription: {name: name}
};
// Without using `writeMessage`, use the return type to match the responses.
// check caller->writeMessage(subscriptionStatus);
caller.setAttribute(hasSubscription, true);
return subscriptionStatus;
} else {
SubscriptionStatus subscriptionStatus = {
channelID: <int>check caller.getAttribute(connId),
channelName: name,
'event: "subscriptionStatus",
pair: pair,
status: unsubscribed,
subscription: {name: name}
};
// check caller->writeMessage(subscriptionStatus);
caller.setAttribute(hasSubscription, false);
return subscriptionStatus;
}
}
dataTypes.bal
public type Heartbeat record {
string event?;
};
public type Unsubscribe record {
Subscribe subscribe;
};
public type Subscribe record {
record { Depth depth?; Ratecounter ratecounter?; Name name; Interval interval?; Snapshot snapshot?; Token token?;} subscription?;
string event;
Pair pair?;
Reqid reqid?;
};
public type Ping record {
string event;
Reqid reqid?;
};
public type Ratecounter boolean?;
public type Pong record {
string event?;
Reqid reqid?;
};
public type Pair string[]?;
public type Token string?;
public type Reqid int?;
public type Depth int?;
public type SystemStatus record {
# The ID of the connection
int connectionID?;
string event?;
string 'version?;
Status status?;
};
public type SubscriptionStatus record {
int channelID;
string channelName;
string event;
Reqid reqid?;
Pair pair?;
Status status?;
Subscription subscription?;
};
public type Subscription record {
Depth depth?;
Interval interval?;
MaxRateCount maxratecount?;
Name name;
Token token?;
};
public type MaxRateCount int?;
public enum Name {
book,
ohlc,
openOrders,
ownTrades,
spread,
ticker,
trade
}
public type Interval int?;
public type Snapshot boolean?;
public enum Status {
online,
maintenance,
cancel_only,
limit_only,
post_only,
subscribed,
unsubscribed
}
This is a code generated from NodeJs WebSocket for the same Async API.
routes.js
const util = require('util');
const { Router } = require('express');
const { pathParser } = require('../lib/path');
const { yellow } = require('../lib/colors');
const { sendMessage, processReceivedMessage } = require('./services/root');
const router = Router();
module.exports = router;
router.ws('', async (ws, req) => {
const path = pathParser(req.path);
console.log(`${yellow(path)} client connected.`);
await sendMessage(ws);
ws.on('message', async (msg) => {
console.log(`${yellow(path)} message was received:`);
console.log(util.inspect(msg, { depth: null, colors: true }));
await processReceivedMessage(ws, { message: msg, path, query: req.query });
});
});
root.js
const service = module.exports = {};
/**
*
* @param {object} ws WebSocket connection.
*/
service.sendMessage = async (ws) => {
ws.send('Message from the server: Implement here your business logic that sends messages to a client after it connects.');
};
/**
*
* @param {object} ws WebSocket connection.
* @param {object} options
* @param {string} options.path The path in which the message was received.
* @param {object} options.query The query parameters used when connecting to the server.
* @param {object} options.message The received message.
* @param {string} options.message.payload.event
* @param {integer} options.message.payload.reqid - client originated ID reflected in response message.
*/
service.processReceivedMessage = async (ws, { message, path, query }) => {
ws.send('Message from the server: Implement here your business logic that reacts on messages sent from a client.');
};
@shafreenAnfar Could you please review this?
Problems with the code https://github.com/ballerina-platform/ballerina-standard-library/issues/2092#issuecomment-1221797752
When generating the Async API from the ballerina code, if someone doing only the publish operation has included onOpen
remote function, Async API will get generated as it has subscription
also.
Updates on currency received by the client are not matching to any of the data types given by the Async API.
Ex:
this is the payload that is sent by the server.
[340,{"a":["45520.10000",6,"6.78103490"],"b":["45520.00000",0,"0.00185230"],"c":["45520.10000","0.01643250"],"v":["1397.95434819","5589.12101024"],"p":["44883.49461","44062.07654"],"t":[14350,66782],"l":["43607.60000","42770.80000"],"h":["45811.10000","45811.10000"],"o":["43659.30000","44709.10000"]},"ticker","XBT/EUR"]
There is no matching data type for this in the Async API subscribe
section.
Description: $subject.