Closed adrianhopebailie closed 9 months ago
@derberg There seams not to be much progress / discussions in: https://github.com/asyncapi/spec/pull/594 May be we should have a dedicated call to get progress here? (But please not at 4PM UTC, at least -1hour/+2hour would match my time zone (CEST) much better)
@derberg There seams not to be much progress / discussions in: #594 May be we should have a dedicated call to get progress here? (But please not at 4PM UTC, at least -1hour/+2hour would match my time zone (CEST) much better)
Of course! What time and date would you suggest? That way we can set up the meeting as all the other meetings for everyone to join.
Meeting scheduled: https://github.com/asyncapi/community/issues/352
cc @smarek in case you still want to champion it 🙂
There you have a well defined response topic. That could be found in spec. As well as a "correlationId" in request message(header/body). The correlationId is a random id generated by the requestor. And have to copied to the response messages, by the responder. To let the requestor map response to the request.
It works mostly the same as option A. But the response topic is not well known. The response topic if chosen by the requestor an put as "replyTopic" into message header/body. The response topic/queue can either be totally random or may have to followed rules specified by the schema.
For reach request there will be a dedicated response topic. Otherwise it is the same as option B. This method is mostly use for request to multi response pattern. Where the requestor don`t know the expected amount of responses.
It works the same as option A. There are no topics at all. In this case the channel is just a dedicated TCP connection of the websocket.
channels:
user.creation.channel:
message:
$ref: 'sample.yaml#/components/messages/createUser'
user.creation.response.channel:
message:
$ref: 'sample.yaml#/components/messages/creationSucessfull'
operations:
createUser:
action: send
channel: user.creation.channel
description: Creates a user and expects a response in the same channel.
reply:
channel: user.creation.response.channel
components:
messages:
createUser:
type: request
name: CreateUser
summary: Represents an explicit request to the service xyz
contentType: application/json
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
description: This header have to be copied to the response message.
payload:
$ref: 'sample.yaml#/components/schemas/createUser'
creationSucessfull:
type: response
name: CreationWasSucessfull
summary: an entity was created
contentType: application/json
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
payload:
$ref: 'sample.yaml#/components/schemas/creationSucessfull'
channels:
user.creation.channel:
message:
$ref: 'sample.yaml#/components/messages/createUser'
operations:
createUser:
action: send
channel: user.creation.channel
description: Creates a user and expects a response in the same channel.
reply:
replyTopic:
source: header
field: replyTo
message:
oneOf:
- $ref: 'sample.yaml#/components/messages/creationFailed'
- $ref: 'sample.yaml#/components/messages/creationSucessfull'
components:
messages:
createUser:
type: request
name: CreateUser
summary: Represents an explicit request to the service xyz
contentType: application/json
headers:
type: object
required:
- correlationId
- replyTo
properties:
correlationId:
type: string
replyTo:
type: string
pattern: user.creation.response.([^.]+)
payload:
$ref: 'sample.yaml#/components/schemas/createUser'
creationSucessfull:
type: response
name: CreationWasSucessfull
summary: an entity was created
contentType: application/json
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
payload:
$ref: 'sample.yaml#/components/schemas/creationSucessfull'
type: string
enum:
- send
- receive
This is the replacement of async api v2 channels.*.publish
and channels.*.subscribe
type: string
default: generic
enum:
- generic
- request
- response
Specify a messages as request or response. To give the code generator a hint.
Complex type.
type: object
properties:
channel:
type: string
description: referring to a defined element in channels section
message:
type: object
description: Either message or channel have to be set, to define the response message schema. If both are given, the message object will overwrite the message of the channel, all other options from the channel still take in place.
replyTopic:
type: object
description: |
Either replyTopic or channel have to be set, to define response topic. If both are given, the replyTopic will overwrite the topic of the channel, all other options from the channel still take in place.
The replyTopic describes where in the request message, the replyTopic can be found.
required:
- source
- field
properties:
source:
type: string
enum:
- header
- body
field:
type: string
description: the field name where the response topic string could be found. In case of the body, the replyTopic needs to be in a root level field.
To define a request reply couple.
This field is optional.
It should be set if:
This field is optional.
It should be set if:
components.messages.*.reply.channel
attribute is not setThis field is optional. If the reply topic is defined in the request message header. You need to define where in the request message it could be found.
It should be set if:
components.messages.*.reply.channel
attribute is not setThe request message contains the reply topic either in the message header or body. For example mqtt3 and websocket, dont suppot message headers. In this case replyTopic needs to be part of the message body.
The field name where the response topic string could be found. In case of the body, the replyTopic needs to be in a root level field. Sub level pathes can not be defined.
Thanks a lot for this breakdown @GreenRover 👏 This is definitely what we need in order to move forward. IIRC, we should also look into JSON RPC as a use case so it can be covered as well.
Is there some one out in the wild that can provide input for JSON RPC? I never used it before. So if there is no one i have to start reading.
Not myself but there was a person in the call who said they were using it. Maybe they join next call 🤔
Yes that was me, I'll describe our use case ASAP (probably tomorrow)
@GreenRover in the meantime if your curious the spec is here ans it's fairly simple: https://www.jsonrpc.org/specification
So here is how we currently use JSON RPC 2.0:
We have servers that can handle multiple methods. For sake of example let's say our server supports the following 2 methods:
Our server listens over MQTT for JSON RPC requests at the following topic:
serverName/request
For MQTT 5, the server will reply to the topic specified by the client in the responseTopic property. This can be anything as the client is responsible for choosing that topic. Therefore it doesn't really make sense to document that topic in the AsyncAPI definition of the server.
For MQTT 3 the server will reply to the serverName/response
topic. All clients listen to that topic and will filter out the correct reply by checking the JSON RPC id.
When used with MQTT the correlationData field of MQTT5 is not really important since JSON RPC has a correlation ID already in the body of the message so it's not used or could be set to the JSON RPC Id.
So in a nutshell what we need is a way to describe multiple methods that are all received on the same topic and also describe the schema of the reply (that needs to be linked with the method)
JSON RPC also has the concept of notifications. A notification is a message send without an ID by the client. When a notification is received, the server must not reply so in that case a response schema is not necessary.
Of some interest is also the fact the server can support other transports than MQTT for the JSON RPC aspect. We usually support UDP/TCP/WebSockets as well. Could be nice if there was a way to represent this in AsyncAPI (although it might be out of scope).
Here's an example of how addUser and delete user would work:
request -->
{
"jsonrpc": "2.0",
"id": 1,
"method": "addUser",
"params": {
"username": "bob",
"age": 33
}
}
response <--
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"status": "ok"
}
}
The server could also respond with an error:
{
"jsonrpc": "2.0",
"id": 1
"error" : {
"code": -31000,
"message": "Can't create user, sorry :("
}
}
What we're mostly interested in documenting via a schema is the params
, result
and error
properties, although a schema for the full message would be ok too.
Let me know if this is clear or if you need me to elaborate further !
Thanks :)
Hey folks, leaving the examples I shared on my screen during the last call:
# channels.yaml (probably defined and maintained at company level)
asyncapi: 3.0.0
channels:
userCreationChannel:
address: user.creation.channel
message:
- $ref: '#/components/messages/createUser'
userCreationChannelWithType2:
address: user.creation.channel
message:
- $ref: '#/components/messages/createUser2'
userCreationReply:
address: null
message:
- $ref: '#/components/messages/creationFailed'
- $ref: '#/components/messages/creationSuccessful'
userCreationSuccessfulReply:
address: null
message:
- $ref: '#/components/messages/creationSuccessful'
components:
messages:
createUser:
...
createUser2:
...
creationFailed:
...
creationSuccessful:
...
# myapplication.yaml (owned by my team)
asyncapi: 3.0.0
operations:
createUserWithSucessfulReply:
action: send
channel: 'channels.yaml#/channels/userCreationChannelWithType2'
description: Creates a user and expects a response in the same channel.
reply:
channel: 'channels.yaml#/channels/userCreationSuccessfulReply'
channels:
user.creation.channel:
message:
$ref: 'sample.yaml#/components/messages/createUser'
user.creation.response.channel:
message:
$ref: 'sample.yaml#/components/messages/creationSucessfull'
operations:
createUser:
action: send
channel: user.creation.channel
description: Creates a user and expects a response in the same channel.
reply:
channel: user.creation.response.channel
components:
messages:
createUser:
type: request
name: CreateUser
summary: Represents an explicit request to the service xyz
contentType: application/json
correlationId:
description: Default Correlation ID
location: $message.header#/correlationId
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
description: This header have to be copied to the response message.
payload:
$ref: 'sample.yaml#/components/schemas/createUser'
creationSucessfull:
type: response
name: CreationWasSucessfull
summary: an entity was created
contentType: application/json
correlationId:
description: Default Correlation ID
location: $message.header#/correlationId
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
payload:
$ref: 'sample.yaml#/components/schemas/creationSucessfull'
channels:
user.creation.channel:
message:
$ref: 'sample.yaml#/components/messages/createUser'
operations:
createUser:
action: send
channel: user.creation.channel
description: Creates a user and expects a response in the same channel.
reply:
message:
oneOf:
- $ref: 'sample.yaml#/components/messages/creationFailed'
- $ref: 'sample.yaml#/components/messages/creationSucessfull'
components:
messages:
createUser:
type: request
name: CreateUser
summary: Represents an explicit request to the service xyz
contentType: application/json
correlationId:
description: Default Correlation ID
location: $message.header#/correlationId
replyTopic:
description: Default topic to answer to
location: $message.header#/replyTo
headers:
type: object
required:
- correlationId
- replyTo
properties:
correlationId:
type: string
replyTo:
type: string
pattern: user.creation.response.([^.]+)
payload:
$ref: 'sample.yaml#/components/schemas/createUser'
creationSucessfull:
type: response
name: CreationWasSucessfull
summary: an entity was created
contentType: application/json
correlationId:
description: Default Correlation ID
location: $message.header#/correlationId
headers:
type: object
required:
- correlationId
properties:
correlationId:
type: string
payload:
$ref: 'sample.yaml#/components/schemas/creationSucessfull'
type: string
default: generic
enum:
- generic
- request
- response
Specify a messages as request or response. To give the code generator a hint.
This field is optional. If the reply topic is defined in the request message header. You need to define where in the request message it could be found.
For specifying and computing the location of a Correlation ID, a runtime expression is used. https://www.asyncapi.com/docs/specifications/v2.2.0#runtimeExpression
type: string
enum:
- send
- receive
This is the replacement of async api v2 channels.*.publish
and channels.*.subscribe
Complex type.
type: object
properties:
channel:
type: string
description: referring to a defined element in channels section
message:
type: object
description: Either message or channel have to be set, to define the response message schema. If both are given, the message object will overwrite the message of the channel, all other options from the channel still take in place.
To define a request reply couple.
This field is optional.
It should be set if:
This field is optional.
It should be set if:
components.messages.*.reply.channel
attribute is not setIn async api v2 -> v3 the relation between channel
, message
, operation
will change.
A channel
has a reference can transport multiple messages.
asyncapi: 3.0.0
channels:
userCreationReply:
address: null
message:
- $ref: 'channels.yaml#/components/messages/creationFailed'
- $ref: 'channels.yaml#/components/messages/creationSuccessful'
In v2->v3 there is a bigger change:
the channel name userCreationChannel
and topic user.creation.channel
is not longer the same field.
asyncapi: 3.0.0
channels:
userCreationChannel:
address: user.creation.channel
message:
- $ref: 'channels.yaml#/components/messages/createUser'
For requestReply where the reply adress is part of the request message, the address
needs to be optional.
For request reply not to set address
Optinion of meeting attendee: Have it defined as NULL makes it more clear and harder to over read.
A channel can transport multiple messages schemas. There for it should be an array of messages.
--> See: "Assuming the operations[*].message
/operations[*].reply.message
are limiting"
A operation
has a reference to a channel
and/or message
.
asyncapi: 3.0.0
operations:
createUserWithSucessfulReply:
action: send
channel: 'channels.yaml#/channels/userCreation'
message: '/components/messages/createUser'
description: Creates a user and expects a response in the same channel.
reply:
channel: 'channels.yaml#/channels/replies'
message: '/components/messages/createSuccesfull'
Optinions:
message
could over write the channels[*].message
channels[*].message
is an array and the operations[*].message
/operations[*].reply.message
is a limit / subset to allow for this operation only a subset of the messages defined of channels
level.
channels
and messages
and the application defined in a separate yaml file only glue it together via operations.components/messages
realy defined on company level. By channel
and topic structure i aggree.operations[*].message
/operations[*].reply.message
are limitingNow we have the need to identify messages to be able to limit those.
asyncapi: 3.0.0
channels:
userCreationReply:
address: null
message:
- $ref: 'channels.yaml#/components/messages/creationFailed'
- $ref: 'channels.yaml#/components/messages/creationSuccessful'
operations:
createUserWithSucessfulReply:
action: send
channel: 'channels.yaml#/channels/userCreation'
reply:
channel: 'channels.yaml#/channels/userCreationReply'
message: 'channels.yaml#/components/messages/creationFailed'
Using the default $ref
syntax. Like: channels.yaml#/components/messages/creationFailed
Using only message
and channel
ids.
# channels.yaml
asyncapi: 3.0.0
channels:
userCreationReply:
address: null
message:
- $ref: 'creationFailed'
- $ref: 'creationSuccessful'
components:
messages:
createUser:
...
createUser2:
...
creationFailed:
...
creationSuccessful:
...
# myapplication.yaml (owned by my team)
operations:
createUserWithSucessfulReply:
action: send
channel: 'userCreation'
reply:
channel: 'userCreationReply'
message: 'creationFailed'
Example:
Converted the kraken 2.0.0 spec sample made by @fmvilas to 3.0.0
```yaml asyncapi: 3.0.0 info: title: Kraken Websockets API version: '1.8.0' description: | WebSockets API offers real-time market data updates. WebSockets is a bidirectional protocol offering fastest real-time data, helping you build real-time applications. The public message types presented below do not require authentication. Private-data messages can be subscribed on a separate authenticated endpoint. ### General Considerations - TLS with SNI (Server Name Indication) is required in order to establish a Kraken WebSockets API connection. See Cloudflare's [What is SNI?](https://www.cloudflare.com/learning/ssl/what-is-sni/) guide for more details. - All messages sent and received via WebSockets are encoded in JSON format - All decimal fields (including timestamps) are quoted to preserve precision. - Timestamps should not be considered unique and not be considered as aliases for transaction IDs. Also, the granularity of timestamps is not representative of transaction rates. - At least one private message should be subscribed to keep the authenticated client connection open. - Please use REST API endpoint [AssetPairs](https://www.kraken.com/features/api#get-tradable-pairs) to fetch the list of pairs which can be subscribed via WebSockets API. For example, field 'wsname' gives the supported pairs name which can be used to subscribe. - Cloudflare imposes a connection/re-connection rate limit (per IP address) of approximately 150 attempts per rolling 10 minutes. If this is exceeded, the IP is banned for 10 minutes. - Recommended reconnection behaviour is to (1) attempt reconnection instantly up to a handful of times if the websocket is dropped randomly during normal operation but (2) after maintenance or extended downtime, attempt to reconnect no more quickly than once every 5 seconds. There is no advantage to reconnecting more rapidly after maintenance during cancel_only mode. servers: # i dont know how this section will look like in 3.0.0 channels: ping: address: / message: - $ref: 'channels.yaml#/components/messages/ping' pong: address: / message: - $ref: 'channels.yaml#/components/messages/pong' heartbeat: address: / message: - $ref: 'channels.yaml#/components/messages/heartbeat' systemStatus: address: / message: - $ref: 'channels.yaml#/components/messages/systemStatus' subscriptionStatus: address: / message: - $ref: 'channels.yaml#/components/messages/subscriptionStatus' subscribe: address: / message: - $ref: 'channels.yaml#/components/messages/subscribe' unsubscribe: address: / message: - $ref: 'channels.yaml#/components/messages/unsubscribe' operations: pingPong: action: send channel: 'channels.yaml#/channels/ping' reply: channel: 'channels.yaml#/channels/pong' heartbeat: action: receive channel: 'channels.yaml#/channels/heartbeat' systemStatus: action: receive channel: 'channels.yaml#/channels/systemStatus' subscribe: action: send channel: 'channels.yaml#/channels/subscribe' reply: channel: 'channels.yaml#/channels/subscriptionStatus' unsubscribe: action: send channel: 'channels.yaml#/channels/unsubscribe' reply: channel: 'channels.yaml#/channels/subscriptionStatus' components: messages: ping: summary: Ping server to determine whether connection is alive description: Client can ping server to determine whether connection is alive, server responds with pong. This is an application level ping as opposed to default ping in websockets standard which is server initiated payload: $ref: '#/components/schemas/ping' headers: type: object properties: correlationId: type: string correlationId: location: $message.header#/correlationId pong: summary: Pong is a response to ping message description: Server pong response to a ping to determine whether connection is alive. This is an application level pong as opposed to default pong in websockets standard which is sent by client in response to a ping payload: $ref: '#/components/schemas/pong' headers: type: object properties: correlationId: type: string correlationId: location: $message.header#/correlationId subscribe: description: Subscribe to a topic on a single or multiple currency pairs. payload: $ref: '#/components/schemas/subscribe' headers: type: object properties: correlationId: type: string correlationId: location: $message.header#/correlationId unsubscribe: description: Unsubscribe, can specify a channelID or multiple currency pairs. payload: $ref: '#/components/schemas/unsubscribe' headers: type: object properties: correlationId: type: string correlationId: location: $message.header#/correlationId subscriptionStatus: description: Subscription status response to subscribe, unsubscribe or exchange initiated unsubscribe. payload: $ref: '#/components/schemas/subscriptionStatus' examples: - payload: channelID: 10001 channelName: ohlc-5 event: subscriptionStatus pair: XBT/EUR reqid: 42 status: unsubscribed subscription: interval: 5 name: ohlc - payload: errorMessage: Subscription depth not supported event: subscriptionStatus pair: XBT/USD status: error subscription: depth: 42 name: book systemStatus: description: Status sent on connection or system status changes. payload: $ref: '#/components/schemas/systemStatus' heartbeat: description: Server heartbeat sent if no subscription traffic within 1 second (approximately) payload: $ref: '#/components/schemas/heartbeat' schemas: ping: type: object properties: event: type: string const: ping reqid: $ref: '#/components/schemas/reqid' required: - event heartbeat: type: object properties: event: type: string const: heartbeat pong: type: object properties: event: type: string const: pong reqid: $ref: '#/components/schemas/reqid' systemStatus: type: object properties: event: type: string const: systemStatus connectionID: type: integer description: The ID of the connection status: $ref: '#/components/schemas/status' version: type: string status: type: string enum: - online - maintenance - cancel_only - limit_only - post_only subscribe: type: object properties: event: type: string const: subscribe reqid: $ref: '#/components/schemas/reqid' pair: $ref: '#/components/schemas/pair' subscription: type: object properties: depth: $ref: '#/components/schemas/depth' interval: $ref: '#/components/schemas/interval' name: $ref: '#/components/schemas/name' ratecounter: $ref: '#/components/schemas/ratecounter' snapshot: $ref: '#/components/schemas/snapshot' token: $ref: '#/components/schemas/token' required: - name required: - event unsubscribe: type: object properties: event: type: string const: unsubscribe reqid: $ref: '#/components/schemas/reqid' pair: $ref: '#/components/schemas/pair' subscription: type: object properties: depth: $ref: '#/components/schemas/depth' interval: $ref: '#/components/schemas/interval' name: $ref: '#/components/schemas/name' token: $ref: '#/components/schemas/token' required: - name required: - event subscriptionStatus: type: object oneOf: - $ref: '#/components/schemas/subscriptionStatusError' - $ref: '#/components/schemas/subscriptionStatusSuccess' subscriptionStatusError: allOf: - properties: errorMessage: type: string required: - errorMessage - $ref: '#/components/schemas/subscriptionStatusCommon' subscriptionStatusSuccess: allOf: - properties: channelID: type: integer description: ChannelID on successful subscription, applicable to public messages only. channelName: type: string description: Channel Name on successful subscription. For payloads 'ohlc' and 'book', respective interval or depth will be added as suffix. required: - channelID - channelName - $ref: '#/components/schemas/subscriptionStatusCommon' subscriptionStatusCommon: type: object required: - event properties: event: type: string const: subscriptionStatus reqid: $ref: '#/components/schemas/reqid' pair: $ref: '#/components/schemas/pair' status: $ref: '#/components/schemas/status' subscription: required: - name type: object properties: depth: $ref: '#/components/schemas/depth' interval: $ref: '#/components/schemas/interval' maxratecount: $ref: '#/components/schemas/maxratecount' name: $ref: '#/components/schemas/name' token: $ref: '#/components/schemas/token' interval: type: integer description: Time interval associated with ohlc subscription in minutes. default: 1 enum: - 1 - 5 - 15 - 30 - 60 - 240 - 1440 - 10080 - 21600 name: type: string description: The name of the channel you subscribe too. enum: - book - ohlc - openOrders - ownTrades - spread - ticker - trade token: type: string description: base64-encoded authentication token for private-data endpoints. depth: type: integer default: 10 enum: - 10 - 25 - 100 - 500 - 1000 description: Depth associated with book subscription in number of levels each side. maxratecount: type: integer description: Max rate-limit budget. Compare to the ratecounter field in the openOrders updates to check whether you are approaching the rate limit. ratecounter: type: boolean default: false description: Whether to send rate-limit counter in updates (supported only for openOrders subscriptions) snapshot: type: boolean default: true description: Whether to send historical feed data snapshot upon subscription (supported only for ownTrades subscriptions) reqid: type: integer description: client originated ID reflected in response message. pair: type: array description: Array of currency pairs. items: type: string description: Format of each pair is "A/B", where A and B are ISO 4217-A3 for standardized assets and popular unique symbol if not standardized. pattern: '[A-Z\s]+\/[A-Z\s]+' ```
Whoever listens to notifications from this issue, I encourage you to have a look at https://github.com/asyncapi/spec/pull/847. It is pretty advanced with few cards on the table, we need different opinions to figure out which one is the best.
If ya need guidance on where to look at, feel free to ask for help
Somewhat related to:
It would be useful if it was possible to describe messages that are explicitly requests and responses and for the auto-generated code to deal with creating the appropriate ephemeral queues and performing matching on the correlationid.
The pattern that seems most common when using a pub-sub message broker is for the requestor to create a single use topic and provide this address as the 'reply-to' header in the request message. The requestor also provides a correlationId which is echoed back to help match requests and replies.
However, when using a transport like WebSockets it would be necessary to do additional work in the generated code to match requests and responses and also deal with message state and expiry.
Ideally this should be abstracted away from API designers who may prefer to define their API in a manner similar to Open API as follows (see the Responses Object - https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#responsesObject) :
I've used the
topics
object but maybe a newoperations
object would be more appropriate?One of the challenges here is the flexibility of having multiple possible responses but also defining the logic for identifying what response has actually been received. (Open API matches on HTTP response code so that's pretty simple).
In my example I just provide a matching rule but this could probably be a lot more flexible.