ClickHouse / ClickHouse

ClickHouse® is a real-time analytics DBMS
https://clickhouse.com
Apache License 2.0
37.75k stars 6.93k forks source link

Problem of using Kafka Engine to consume message from Azure Event Hub #12609

Closed meetable closed 3 years ago

meetable commented 4 years ago

I'm trying to use clickhouse kafka engine to consume messages from Azure Eventhub (which supports same kafka protocol 1.0 or later). But it didn't work.

Here is what I have done,

1.Config kafka settings in clickhouse config.xml

sasl_plaintext PLAIN $ConnectionString {Token} latest

2.Setup kafka engine table to consume Azure Event Hub namespace

CREATE TABLE queue ( timestamp UInt64, level String, message String ) ENGINE = Kafka('{NameSpace}.servicebus.windows.net:9093', '{TopicName}', '$Default', 'CSV');

3.Send messages to Event hub.

  1. Select * from kafka engine table, but got 0 rows returned.

    SELECT * FROM queue Ok. 0 rows in set. Elapsed: 15.014 sec.

Here is the clickhouse server log.

2020.07.20 22:32:23.721576 [ 98101 ] {c3486e06-083b-4387-a563-4ff7d488ac39} StorageKafka (queue): Can't get assignment

2020.07.20 22:32:23.721916 [ 98101 ] {c3486e06-083b-4387-a563-4ff7d488ac39} StorageKafka (queue): Nothing to commit.

2020.07.20 22:32:23.722445 [ 98110 ] {c3486e06-083b-4387-a563-4ff7d488ac39} MemoryTracker: Peak memory usage (for query): 0.00 B.

The log says can't get assignment. Has anyone tried Event Hub and what is the possible missing step?

hodzanassredin commented 3 years ago

No answers????

We have eventhub in azure and can connect to it using kafkacat tool. Config:

security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username=$ConnectionString sasl.password=Endpoint=xxx

In clickhouse(yandex cloud) we configured kafka engine the same way as kafkacat, but no luck, we have a lot of errors in logs:

[rdk:ERROR] [thrd:app]: ClickHouse-xxx.yandexcloud.net-db1-telemetry_out#consumer-1: sasl_ssl://xxx.servicebus.windows.net:9093/bootstrap: SSL handshake failed: error:14000086:SSL routines::certificate verify failed (after 94ms in state CONNECT, 31 identical error(s) suppressed)

We also have a single error of this type, but it doesn't occur again

Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 45ms in state APIVERSION_QUERY, 4 identical error(s) suppressed)

After all, we have errors in logs: Can't get assignment. It can be caused by some issue with consumer group (not enough partitions?). Will keep trying.

Probably the main problem is the default configuration and probably it has to be changed to match https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations

ababutski commented 3 years ago

Just want to share my pain about this here. I managed to properly set up clickhouse kafka engine with event hub kafka endpoint.

  1. First of all add clickhouse instance ip to event hub namespace firewall. Better test connectivity with kafkacat before moving on.

  2. Second, setting up kafka engine config. My config

    <kafka>
        <security_protocol>SASL_SSL</security_protocol>
        <sasl_mechanism>PLAIN</sasl_mechanism>
        <ssl_ca_location>/path/to/cert.pem</ssl_ca_location>
        <sasl_username>$ConnectionString</sasl_username>
        <sasl_password>Endpoint=sb://namespace.servicebus.windows.net...</sasl_password>
    </kafka>

    cert.pem can be found here https://curl.haxx.se/docs/caextract.html Explanation of this here https://notetoself.tech/2018/06/03/acessing-event-hubs-with-confluent-kafka-library/

  3. Then creating a table

    CREATE TABLE testing.kafka_test ( msg String ) ENGINE = Kafka SETTINGS kafka_broker_list = '<namespace>.servicebus.windows.net:9093',
                            kafka_topic_list = 'topic_name',
                            kafka_group_name = '$Default',
                            kafka_format = 'JSONEachRow';
  4. Now if you are lucky and did everything right. After adding json messages in topic you will see the results of a query SELECT * FROM testing.kafka_test Hope this helps