Qihoo360 / qbusbridge

The Apache Kafka Client SDK
MIT License
292 stars 72 forks source link
consumer cplusplus golang kafka kafka-consumer kafka-producer kafkabridge kafkaclient kafkasdk php producer python

Introduction 中文

Features

Compiling

Ensure your system has g++ (>= 4.8.5), boost (>= 1.41), cmake (>= 3.1) and swig (>= 3.0.12) installed.

In addition, qbus SDK is linking libstdc++ statically, so you must ensure that libstdc++.a exists. For CentOS users, run:

sudo yum install -y glibc-static libstdc++-static

git clone:

git clone --recursive https://github.com/ntt360/qbusbridge.git

SASL Support

If you need librdkafa to support kafka SASL authentication, you also need to install:

sudo yum install -y cyrus-sasl-devel

If you also use GSSAPI authentication, you need to compile the corresponding plugin:

sudo yum install -y cyrus-sasl-gssapi

1. Install submodules

Run ./build_dependencies.sh.

It will automatically download submodules and install them to cxx/thirdparts/local where CMakeLists.txt finds headers and libraries.

See ./cxx/thirdparts/local:

include/
  librdkafka/
    rdkafka.h
  log4cplus/
    logger.h
lib/
  librdkafka.a
  liblog4cplus.a

If you want to support SASL functionality, after compiling, you can go to the cxx/thirdparts/librdkafka/examples/ directory and execute the following command to test whether the SASL component has been successfully compiled:

cd cxx/thirdparts/librdkafka/examples/
./rdkafka_example -X builtin.features
# builtin.features = gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,sasl_oauthbearer,http,oidc

Make sure the output of builtin.features has compiled the SASL related authentication modules!

2. Build SDK

C++

Navigate to the cxx directory and run ./build.sh, following files will be generated:

include/
  qbus_consumer.h
  qbus_producer.h
lib/
  debug/libQBus.so
  release/libQBus.so

Though building C++ SDK requires C++11 support, the SDK could be used with older g++. eg. build qbus SDK with g++ 4.8.5 and use qbus SDK with g++ 4.4.7.

Go

Navigate to the golang directory and run ./build.sh, following files will be generated:

gopath/
  src/
    qbus/
      qbus.go
      libQBus_go.so

You can enable go module for examples by running USE_GO_MOD=1 ./build.sh. Then following files will be generated:

examples/
  go.mod
  qbus/
    qbus.go
    go.mod
    libQBus_go.so

Python

Navigate to the python directory and run ./build.sh, following files will be generated:

examples/
  qbus.py
  _qbus.so

PHP

Navigate to the php directory and run build.sh, following files will be generated:

examples/
  qbus.php
  qbus.so

3. Build examples

C++

Navigate to examples subdirectory and run ./build.sh [debug|release] to generate executable files. debug is using libQBus.so in lib/debug subdirectory, release is using libQBus.so in lib/release subdirectory. Run make clean to delete them.

If you want to build your own programs, see how Makefile does.

Go

Navigate to examples subdirectory and run ./build.sh to generate executable files, run ./clean.sh to delete them.

Add path of libQBus_go.so to env LD_LIBRARY_PATH, eg.

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

If you want to build your own programs, add generated gopath directory to env GOPATH, or move gopath/src/qbus directory to $GOPATH/src.

Python

Copy generated qbus.py and _qbus.so to the path of the Python scripts to run.

PHP

Edit php.ini and add extension=<module-path>, <module-path> is the path of qbus.so.

Usage

Data Producing

#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

Data Consuming

#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                    const char* msg,
                    const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }

};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                    "log",
                    "config",
                    my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //other operations can appear here

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}

You can use pause() and resume() methods to pause or resume consuming some topics, see qbus_pause_resume_example.cc

See examples in C examplesC++ examplesGo examplesPython examplesPHP examples for more usage.

CONFIGURATION

The configuration file is in INI format:

[global]

[topic]

[sdk]

See rdkafka 1.0.x configuration for global and topic configurations, and sdk configuration for sdk configuration.

Normally kafkabridge works with an empty configuration file, but if your broker version < 0.10.0.0, you must specify api.version-related configuration parameters, see broker version compatibility.

eg. for broker 0.9.0.1, following configurations are necessary:

[global]
api.version.request=false
broker.version.fallback=0.9.0.1

The default config is now compatible with broker 0.9.0.1. Therefore, if higher version broker is used, api.version.request should be set true. Otherwise, the message protocol would be older version, e.g. no timestamp field.

Contact

QQ group: 876834263