devicehive / devicehive-ws-proxy

Apache License 2.0
4 stars 9 forks source link
connector devicehive kafka nodejs websocket

License

WebSocket proxy to message broker

The project wrap some of the essential message broker functionality allowing you to communicate with the next message brokers through WebSockets:

Start the proxy

Internal mode

Run the next command:

- node ./src/proxy.js

External mode (with enabled PluginManager)

In configuration file:

[path-to-proxy-project]/src/config.json

set the ENABLE_PLUGIN_MANAGER field to true or
Set environmental variable PROXY.ENABLE_PLUGIN_MANAGER to true
And run the next command:

- node ./src/proxy.js

By default proxy listening for WS connections on ws://localhost:3000 (independent from the mode). To change it set the WEB_SOCKET_SERVER_HOST and WEB_SOCKET_SERVER_PORT fields in the configuration file or set environmental variables: PROXY.WEB_SOCKET_SERVER_HOST and PROXY.WEB_SOCKET_SERVER_PORT

Configuration

Proxy

[path-to-proxy-project]/src/config.json

Each configuration field can be overridden with corresponding environmental variable with "PROXY" prefix, for example:

PROXY.WEB_SOCKET_SERVER_PORT=6000

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
PROXY_WEB_SOCKET_SERVER_PORT=6000

Message Buffer configuration

[path-to-proxy-project]/src/messageBuffer/config.json

Each configuration field can be overridden with corresponding environmental variable with "MESSAGE_BUFFER" prefix, for example:

MESSAGE_BUFFER.MAX_SIZE_MB=256

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
MESSAGE_BUFFER_MAX_SIZE_MB=256

Plugin Manager configuration

[path-to-proxy-project]/src/pluginManager/config.json

Each configuration field can be overridden with corresponding environmental variable with "PLUGIN_MANAGER" prefix, for example:

PLUGIN_MANAGER.AUTH_SERVICE_ENDPOINT=http://localhost:9090/dh/rest

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
PLUGIN_MANAGER_AUTH_SERVICE_ENDPOINT=http://localhost:9090/dh/rest

Message Brokers

Kafka

[path-to-proxy-project]/src/kafka/config.json

Each configuration field can be overridden with corresponding environmental variable with "KAFKA" prefix, for example:

KAFKA.KAFKA_HOSTS=localhost:9094

Prefix separator can be overridden by ENVSEPARATOR environmental variable. Example:

ENVSEPARATOR=_
KAFKA_KAFKA_HOSTS=localhost:9094

Proxy modules logging

Through the "DEBUG" environment variable you are able to specify next modules loggers:

Example:

DEBUG=kafka,messagebuffer,websocketserver

Message Structure

General

All messages are JSON based. Generic message structure looks like this:

{
  "id": "id of original message",
  "t": "message type",
  "a": "action",
  "s": "success",
  "p": "payload"
}
Param Type Description
id String or Int Original Message Id
t String Type: ["topic","notif","health","plugin"]
a String Action: ["create","list","subscribe","unsubscribe","authenticate" "ack"]
s Int Status, returned by the server, 0 if OK.
p Object Payload object

Server can receive a list of messages in one batch.

Ack message:

Success ACK:

{
    "t": "ack",
    "s": 0
}

Failure ACK:

{
    "t": "ack",
    "s": 1,
    "p": { "m": <error message> }
}

Topics

Create

Request message:

{
    "t": "topic",
    "a": "create",
    "p": { "t": ["topic1", "topic2", "topicN"] }
}

Response message:

{
    "t": "topic",
    "a": "create",
    "p": { "t": ["topic1", "topic2", "topicN"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "create",
    "p": { "m": <error message> },
    "s": 1
}

List

Request message:

{
    "t": "topic",
    "a": "list"
}

Response message:

{
    "t": "topic",
    "a": "list",
    "p": { "t": ["topic1", "topic2", "topicN"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "list",
    "p": { "m": <error message> },
    "s": 1
}

Subscribe

Request message:

{
    "t": "topic",
    "a": "subscribe",
    "p": {
        "sg": "subscriptionGroup",
        "t": ["topic1", "topic2"]
    }
}

Response message:

{
    "t": "topic",
    "a": "subscribe",
    "p": { "sg": "subscriptionGroup", "t": ["topic1", "topic2"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "subscribe",
    "p": { "m": <error message> },
    "s": 1
}

Where subscriptionGroup - group of consumers where messages apportions via balancing (RoundRobin) logic

Unsubscribe

Request message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "t": ["topic1", "topic2"] }
}

Response message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "t": ["topic1", "topic2"] },
    "s": 0
}

Error message:

{
    "t": "topic",
    "a": "unsubscribe",
    "p": { "m": <error message> },
    "s": 1
}

Plugin

Authenticate

Request message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": { "token": <plugin access token> }
}

Response message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": {
        "tpc": <plugin topic name>,
        "e": <plugin access token expiration date>,
        "t": 1
    },
    "s": 0
}

Where:

tpc - plugin topic name
e - plugin access token expiration date
t - plugin token type (0 - refresh token, 1 - access token)

Error message:

{
    "t": "plugin",
    "a": "authenticate",
    "p": { "m": <error message> },
    "s": 1
}

Notification

Send

Request message:

{
    "t": "notif",
    "a": "create",
    "p": {
        "t": "topic1",
        "m": <notification message srting>,
        "part": 1
    }
}

Response message:

{
    "t": "notif",
    "a": "create",
    "s": 0
}

Error message:

{
    "t": "notif",
    "a": "create",
    "p": { "m": <error message> },
    "s": 1
}

Receive

Notifications are received automatically after subscription. Notification message

{
    "t": "notif",
    "p": { "m": <notification message string> }
}

Healthcheck

Request message

{
    "t": "health"
}

Response message:

{
    "t": "health",
    "s": 0,
    "p": {
        "prx": "Available|Not Available",
        "mb": "Available|Not Available",
        "mbfp": <0-100>,
        "comm": "Available|Not Available"
    }
}

Where:

prx - Proxy Status
mb - Message buffer status
mbfp - Message Buffer fill percentage
comm - Internal message broker status