ClickHouse / ClickHouse

ClickHouse® is a real-time analytics DBMS
https://clickhouse.com
Apache License 2.0
37.56k stars 6.9k forks source link

How to use Apicurio schema registry in Kafka Table Engine #39863

Open abdelhakimbendjabeur opened 2 years ago

abdelhakimbendjabeur commented 2 years ago

Hello,

I am trying to setup a pipeline to stream data from Kafka into ClickHouse going through Apicurio registry. I looked in the doc and I only found support for confluent schema registry , here. It obviously didn't work because of wrong Magic Number

Here is how I created the table (I am running everything in docker)

CREATE TABLE default.cdc_queue
 (
     `txId`     Int64,
     `op`     String,
     `schema` String,
     `table`  Int64
 )
 ENGINE = Kafka
 SETTINGS
         kafka_broker_list = 'host.docker.internal:29092',
         kafka_topic_list = 'cdc_topic',
         format_avro_schema_registry_url = 'http://localhost:8765/apis/registry/v2',
         kafka_group_name = 'clickhouse_cdc_consumer_group',
         kafka_format = 'Avro',
         kafka_num_consumers = 1;

Is apicurio supported, if so, can someone please point me to an example please 🙏 ? Thank you

ljluestc commented 4 months ago

#include <iostream>
#include <librdkafka/rdkafkacpp.h>
#include <clickhouse/client.h>
#include <curl/curl.h>
#include <nlohmann/json.hpp>

// Using the clickhouse-cpp client library
using namespace clickhouse;
using json = nlohmann::json;

// Kafka consumer configuration
std::string brokers = "host.docker.internal:29092";
std::string group_id = "clickhouse_cdc_consumer_group";
std::string topic = "cdc_topic";

// Apicurio registry URL
std::string apicurio_url = "http://localhost:8765/apis/registry/v2";

size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp) {
    ((std::string*)userp)->append((char*)contents, size * nmemb);
    return size * nmemb;
}

json fetch_schema(int schema_id) {
    CURL* curl;
    CURLcode res;
    std::string readBuffer;
    std::string url = apicurio_url + "/ids/" + std::to_string(schema_id);

    curl = curl_easy_init();
    if (curl) {
        curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
        res = curl_easy_perform(curl);
        curl_easy_cleanup(curl);

        if (res != CURLE_OK) {
            throw std::runtime_error("Failed to fetch schema");
        }
    }
    return json::parse(readBuffer);
}

void process_message(const RdKafka::Message& message, Client& client) {
    // Extract schema ID from the message value
    int schema_id = (message.payload()[0] << 24) | (message.payload()[1] << 16) |
                    (message.payload()[2] << 8) | message.payload()[3];
    json schema = fetch_schema(schema_id);

    // Deserialize message using fetched schema
    // Implement your deserialization logic here

    // Example: Insert into ClickHouse
    Block block;
    block.AppendColumn("txId", std::make_shared<ColumnInt64>());
    block.AppendColumn("op", std::make_shared<ColumnString>());
    block.AppendColumn("schema", std::make_shared<ColumnString>());
    block.AppendColumn("table", std::make_shared<ColumnInt64>());

    // Insert data (example values)
    block[0]->As<ColumnInt64>()->Append(123);
    block[1]->As<ColumnString>()->Append("operation");
    block[2]->As<ColumnString>()->Append("schema");
    block[3]->As<ColumnInt64>()->Append(456);

    client.Insert("default.cdc_queue", block);
}

int main() {
    // ClickHouse client configuration
    ClientOptions options;
    options.SetHost("localhost");
    Client client(options);

    // Kafka consumer configuration
    std::string errstr;
    RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    conf->set("bootstrap.servers", brokers, errstr);
    conf->set("group.id", group_id, errstr);
    conf->set("auto.offset.reset", "earliest", errstr);

    RdKafka::Consumer* consumer = RdKafka::Consumer::create(conf, errstr);
    if (!consumer) {
        std::cerr << "Failed to create consumer: " << errstr << std::endl;
        return 1;
    }

    RdKafka::Topic* topic_ptr = RdKafka::Topic::create(consumer, topic, nullptr, errstr);
    if (!topic_ptr) {
        std::cerr << "Failed to create topic: " << errstr << std::endl;
        return 1;
    }

    consumer->start(topic_ptr, 0, RdKafka::Topic::OFFSET_BEGINNING);

    while (true) {
        RdKafka::Message* msg = consumer->consume(topic_ptr, 0, 1000);
        if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
            delete msg;
            continue;
        } else if (msg->err()) {
            std::cerr << "Consumer error: " << msg->errstr() << std::endl;
            delete msg;
            break;
        }

        process_message(*msg, client);
        consumer->commitSync();
        delete msg;
    }

    consumer->stop(topic_ptr, 0);
    consumer->poll(1000);
    delete topic_ptr;
    delete consumer;

    return 0;
}