housepower / clickhouse_sinker

Easily load data from kafka to ClickHouse
https://housepower.github.io/clickhouse_sinker
Apache License 2.0
514 stars 118 forks source link

saramaclient with kafka V 3.1.0 report client has run out of available brokers to talk to (Is your cluster reachable?) error? #150

Closed sunruoyu123 closed 2 years ago

sunruoyu123 commented 2 years ago

run report error : {"level":"fatal","ts":"2022-02-23T13:32:30.244+0800","msg":"s.applyConfig failed","error": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)", how to resolve it? many thanks! config file as follows: { "@ClickHouse config":"", "clickhouse": { "@cluster the ClickHouse node belongs":"", "cluster": "", "@hosts":"for connection, it's Array(Array(String))", "@hosts":"we can put hosts with same shard into the inner array", "@hosts":"it helps data deduplication for ReplicateMergeTree when driver error occurs", "hosts": [ [ "192.168.10.104" ] ], "port": 9000, "username": "default", "password": "", "@database name":"", "db": "default", "@enable TLS":"Whether enable TLS encryption with clickhouse-server", "secure": false, "@Whether skip verify clickhouse-server cert if secure=true.":"", "insecureSkipVerify": false, "@retryTimes":"retryTimes when error occurs in inserting datas", "retryTimes": 0, "@max open connections":"max open connections with each clickhouse node. default to 1.", "maxOpenConns": 1 },

"@Kafka config":"", "kafka": { "brokers": "192.168.10.102:9092,192.168.10.103:9092,192.168.10.104:9092",

"@jave client style security authentication":"",
"security":{
    "security.protocol": "SASL_PLAINTEXT",
    "sasl.kerberos.service.name": "kafka",
    "sasl.mechanism":"GSSAPI",
    "sasl.jaas.config":"com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab=\"/etc/security/mmmtest.keytab\" principal=\"mmm@ALANWANG.COM\";"
},

"@SSL":"",
"tls": {
  "enable": false,
  "@Required":"It's the CA certificate with which Kafka brokers certs be signed.",
  "caCertFiles": "/etc/security/ca-cert",
  "@Required":"if Kafka brokers require client authentication.",
  "clientCertFile": "",
  "@Required":"if and only if ClientCertFile is present.",
  "clientKeyFile": ""
},

"@SASL":"",
"sasl": {
  "enable": false,
  "@Mechanism":"Mechanism is the name of the enabled SASL mechanism.",
  "@Possible values":"Possible values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (defaults to PLAIN)",
  "mechanism": "PLAIN",
  "@Username":"Username is the authentication identity (authcid) to present for SASL/PLAIN or SASL/SCRAM authentication",
  "username": "",
  "@Password":"Password for SASL/PLAIN or SASL/SCRAM authentication",
  "password": "",
  "gssapi": {
    "@@":"authtype - 1. KRB5_USER_AUTH, 2. KRB5_KEYTAB_AUTH",
    "authtype": 0,
    "keytabpath": "",
    "kerberosconfigpath": "",
    "servicename": "",
    "username": "",
    "password": "",
    "realm": "",
    "disablepafxfast": false
  }
},

"@kafka version":"if you use sarama, the version must be specified",
"version": "3.1.0"

},

"task": { "name": "daily_json", "@kafka client":"possible values: sarama, kafka-go. (defaults to sarama)", "kafkaClient": "sarama", "@kafka topic":"", "topic": "daily", "@kafka consume from earliest or latest":"", "earliest": true, "@kafka consumer group":"", "consumerGroup": "group", "@message parser":"", "parser": "json", "@clickhouse table name":"", "tableName": "daily", "@columns of the table":"", "dims": [ { "@column name":"", "name": "day", "@column type":"", "type": "DateTime" }, { "name": "level", "type": "String" }, { "name": "total", "type": "Float32", "@json field name":"This must be specified if it doesn't match with the column name", "@Ex sourcename":"val" } ],

"@@":"if it's specified, clickhouse_sinker will detect table schema instead of using the fixed schema given by [dims].",
"autoSchema" : true,
"@@":"these columns will be excluded from the detected table schema. This takes effect only if [autoSchema] is true.",
"excludeColumns": [],
    "@experiment feature":"(experiment feature) detect new fields and their type, and add columns to the ClickHouse table accordingly." ,
"@experiment feature":"This feature requires parser be fastjson or gjson. New fields' type will be one of: Int64, Float64, String.",
"@experiment feature":"A column is added for new key K if all following conditions are true",
"@experiment feature":"- K isn't in ExcludeColumns",
"@experiment feature":"- number of existing columns doesn't reach MaxDims-1",
"@experiment feature":"- WhiteList is empty, or K matchs WhiteList",
"@experiment feature":"- BlackList is empty, or K doesn't match BlackList ",
"dynamicSchema": {
  "@@":"hether enable this feature, default to false",
  "enable": false,
  "@@":"the upper limit of dynamic columns number, <=0 means math.MaxInt16. protecting dirty data attack",
  "maxDims": 1024,
  "@@":"the regexp of white list. syntax reference: https://github.com/google/re2/wiki/Syntax",
  "whiteList": "^[0-9A-Za-z_]+$",
  "@@":"the regexp of black list",
  "blackList": "@"
},

"@shardingKey":"shardingKey is the column name to which sharding against",
"shardingKey": "",
"@shardingStripe":"shardingStripe take effect iff the sharding key is numerical",
"shardingStripe": 0,

"@interval":"iterval of flushing the batch. Default to 5, max to 600.",
"flushInterval": 5,
"@batch size":"batch size to insert into clickhouse. sinker will round upward it to the the nearest 2^n. Default to 262114, max to 1048576.",
"bufferSize": 262114,

"@timeZone":"In the absence of time zone information, interprets the time as in the given location. Default to 'Local' (aka /etc/localtime of the machine on which sinker runs)",
"timeZone": "",
"@@@":"Time unit// Time unit when interprete a number as time. Default to 1.0.",
"@@@":"Java's timestamp is milliseconds since epoch. Change timeUnit to 0.001 at this case.",
"timeUnit": 1.0

},

"@log level":"possible value: debug, info, warn, error, dpanic, panic, fatal. Default to info.", "logLevel": "debug" }

yuzhichang commented 2 years ago

Please try franz - change to "kafkaClient": "franz",