apache / rocketmq-client-nodejs

Apache RocketMQ nodejs client
https://rocketmq.apache.org/
Apache License 2.0
188 stars 83 forks source link
rocketmq

RocketMQ Client for Node.js

Version Downloads License TravisCI Dependency

This official Node.js client is a lightweight wrapper around rocketmq-client-cpp, a finely tuned CPP client.

Notice 1: This client is still in dev version. Use it cautiously in production.

Notice 2: This SDK is now only support macOS and Ubuntu 14.04. Ubuntu 16+ is not supported and CentOS is not tested yet.

Installation

$ npm install --save apache-rocketmq

Examples

You may view example/producer.js and example/push_consumer.js for quick start.

Usage

Require this package first.

const { Producer, PushConsumer } = require("apache-rocketmq");

Producer

Constructor

new Producer(groupId[, instanceName][, options]);

Producer's constructor receives three parameters:

e.g.

const { Producer } = require("apache-rocketmq");
const producer = new Producer("GROUP_ID", "INSTANCE_NAME", {
    nameServer: "127.0.0.1:9876",
});

start

producer.start([callback]);

.start receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

producer.start(function(err) {
    if(err) {
        //
    }
});

// or

producer.start().then(() => {
    //
}).catch(err => {
    //
});

shutdown

producer.shutdown([callback]);

.shutdown receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

producer.shutdown(function(err) {
    if(err) {
        //
    }
});

// or

producer.shutdown().then(() => {
    //
}).catch(err => {
    //
});

send

producer.send(topic, body[, options][, callback]);

.send receives 4 parameters including a callback. If no callback passed, this function will return a Promise object.

e.g.

producer.send("test", `baz ${i}`, {
    keys: "foo",
    tags: "bar"
}, function(err, result) {
    if(err) {
        // ...    
    } else {
        console.log(result);

        // console example:
        //
        //  { status: 0,
        //    statusStr: 'OK',
        //    msgId: '0101007F0000367E0000309DD68B0700',
        //    offset: 0 }
    }
});
send status and statusStr
status statusStr
0 OK
1 FLUSH_DISK_TIMEOUT
2 FLUSH_SLAVE_TIMEOUT
3 SLAVE_NOT_AVAILABLE

PushConsumer

Constructor

new PushConsumer(groupId[, instanceName][, options]);

PushConsumer's constructor receives three parameters:

e.g.

const { PushConsumer } = require("apache-rocketmq");
const consumer = new PushConsumer("GROUP_ID", "INSTANCE_NAME", {
    nameServer: "127.0.0.1:9876",
    threadCount: 3
});

start

consumer.start([callback]);

.start receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

consumer.start(function(err) {
    if(err) {
        //
    }
});

// or

consumer.start().then(() => {
    //
}).catch(err => {
    //
});

shutdown

consumer.shutdown([callback]);

.shutdown receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

consumer.shutdown(function(err) {
    if(err) {
        //
    }
});

// or

consumer.shutdown().then(() => {
    //
}).catch(err => {
    //
});

subscribe

Add a subscription relationship to consumer.

consumer.subscribe(topic[, expression]);

.subscribe receives two parameters which the second parameter is optional.

On Message Event

If you want to receive messages from RocketMQ Server, you should add a listener for message event which receives 2 parameters.

function YOUR_LISTENER(msg, ack) {
    //
}

msg object looks like:

{ topic: 'test',
  tags: 'bar',
  keys: 'foo',
  body: 'baz 7',
  msgId: '0101007F0000367E0000339DD68B0800' }

You may call ack.done() to tell RocketMQ that you've finished your message successfully which is same as ack.done(true). And you may call ack.done(false) to tell it that you've failed.

e.g.

consumer.on("message", function(msg, ack) {
    console.log(msg);
    ack.done();
});

Apache RocketMQ Community

Contact Us

How to Contribute

Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this how to contribute guide for more details.

License

Apache License, Version 2.0 Copyright (C) Apache Software Foundation