Streaminy / ksqldb-client

Simple KsqlDB client for Node.js using JavaScript.
MIT License
19 stars 3 forks source link

ksqldb-client

Simple KsqlDB client for Node.js using JavaScript 🚀

Documentation

Install

npm install ksqldb-client

Getting started

const KsqldbClient = require("ksqldb-client");
const client = new KsqldbClient();

const asyncOperation = async () => {
    await client.connect();
    const streams = await client.listStreams();
    console.log(streams);

    /* ... */

    await client.disconnect();
};

asyncOperation();

Username authorization

const options = {
    authorization: {
        username: "username",
        password: "password",
        ssl: {
            ca: ..,
            crt: ..,
            key: ..,
        }
    },
    host: "http://..",
    port: 8088,

}
const client = new KsqldbClient(options);

Pull queries

const { data, status, error } = await client.query("SELECT * FROM table WHERE column = 'string';");
const { metadata, rows } = data;

Push queries

const cb = (data) => {
    const { metadata, rows } = data;
    const { queryId } = metadata;
    // ...
};

// Promise resolves after the push query ends.
const { status, error } = await client.streamQuery("SELECT * FROM table EMIT CHANGES;", cb);

Terminate push query

const { error } = await client.terminatePushQuery("queryId");

if (!error) {
    console.log("Query terminated.");
}

Execute statement

await client.executeStatement("DROP TABLE IF EXISTS table;");

Insert into

const row = {
    field: "value",
};
const { status, error } = await client.insertInto("streamName", row);
const { metadata, rows } = data;

Describe source

const sourceDescription = await client.describeSource("streamName");

Handling errors

There are two types of errors.

try {
    const { status, error } = await client.query("SELECT *;");

    if (error) {
        console.log("Error returned by KsqlDB: ", error);
    }
} catch (err) {
    console.error("Error thrown while doing the query: ", err);
}

Status

The status returned on each operation is the same one returned by KsqlDB (200, 400, 500, etc..) and they could be used to troubleshoot errors or assert successful requests.

Stream properties and session variables

Optional extra parameters can be used as follow:

// Statements
const executeStatementResults = await client.executeStatement(statement, {
    sessionVariables: {
        STREAM_NAME: STREAM_NAME,
        TOPIC_NAME: TOPIC_NAME,
    },
});

// Querys
const query = "SELECT * FROM ${TABLE_NAME} WHERE WORD in ('${FIRST_WORD}', '${SECOND_WORD}');";
const { data: queryData, error } = await client.query(query, {
    sessionVariables: {
        TABLE_NAME: TABLE_NAME,
        FIRST_WORD: "tree",
        SECOND_WORD: "wind",
    },
});

// Stream Query
const streamQueryResults = await client.streamQuery("SELECT * FROM ${TABLE_NAME} EMIT CHANGES;", cb, {
    sessionVariables: {
        TABLE_NAME: TABLE_NAME,
    },
});