uber / uReplicator

Improvement of Apache Kafka Mirrormaker
Apache License 2.0
906 stars 200 forks source link

OffsetMonitor stopped producing metrics after disabling anonymous access on SRC and DST Kafka clusters #350

Open disserakt opened 2 years ago

disserakt commented 2 years ago

Hi to all! @yangy0000 - I found strange OffsetMonitor behavior when I disabled ports without encryption and with anonymous access on Kafka clusters SRC and DST. That is, those ports that worked under the PLAINTEXT protocol. And only those ports on Kafka clusters, where the protocol is SASL_SSL, continued to work - that is, with encryption and authorization using the SCRAM mechanism. And after that, the OffsetMonitor began to produce the following errors in logs:

{
    "level": "INFO",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1",
        "file": "OffsetMonitor.java",
        "method": "run",
        "line": "132"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "TopicList starts updating",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:19.493Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
{
    "level": "INFO",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
        "file": "OffsetMonitor.java",
        "method": "updateTopicList",
        "line": "197"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "Update topicList",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.493Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
    {
    "exception": {
        "class": "java.nio.channels.ClosedChannelException",
        "stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
    },
    "level": "INFO",
    "location": {
        "class": "kafka.utils.Logging$class",
        "file": "Logging.scala",
        "method": "info",
        "line": "68"
    },
    "logger": "kafka.consumer.SimpleConsumer",
    "message": "Reconnect due to error:",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.494Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}
    {
    "exception": {
        "class": "java.nio.channels.ClosedChannelException",
        "stacktrace": "java.nio.channels.ClosedChannelException\n\tat kafka.network.BlockingChannel.send(BlockingChannel.scala:112)\n\tat kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)\n\tat kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)\n\tat kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)\n\tat kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)\n\tat com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)\n\tat java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)"
    },
    "level": "WARN",
    "location": {
        "class": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
        "file": "OffsetMonitor.java",
        "method": "updateTopicList",
        "line": "234"
    },
    "logger": "com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor",
    "message": "Got exception to get metadata from broker=null:-1",
    "host": "ureplicator-controller-2-778558bbf8-5twpf",
    "tags": [
        "ureplicator"
    ],
    "@timestamp": "2021-09-10T08:59:24.499Z",
    "thread": "topic-list-cron-0",
    "@version": "1"
}

Is it somehow possible to fix this behavior for me on my own, by correcting some configuration, for example, or do I need to wait for a fix of the uReplicator itself and its new version?

I would be glad to hear any advice, thanks.

disserakt commented 2 years ago

@yangy0000 - I turned on logging in debug mode and got this complete error log with OffsetMonitor:

INFO TopicList starts updating (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
INFO Update topicList (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
DEBUG Reading reply sessionid:0x309607c86e80242, packet:: clientPath:null serverPath:null finished:false header:: 1307,8  replyHeader:: 1307,21484188540,0  request:: '/ureplicator-databusfed-dev/controller-worker-cluster1-cluster2-0/IDEALSTATES,T  response:: v{'ureplicator-databus-fed-dev}  (org.apache.zookeeper.ClientCnxn)
DEBUG TopicList: [ureplicator-databus-fed-dev] (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
INFO Reconnect due to error: (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:88)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
    at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
    at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
WARN Got exception to get metadata from broker=null:-1 (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)
    at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:114)
    at kafka.javaapi.consumer.SimpleConsumer.send(SimpleConsumer.scala:70)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.updateTopicList(OffsetMonitor.java:211)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor.access$600(OffsetMonitor.java:55)
    at com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor$1.run(OffsetMonitor.java:158)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
DEBUG partitionLeader: {} (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)
DEBUG OffsetMonitor updates offset with leaders={} (com.uber.stream.kafka.mirrormaker.controller.core.OffsetMonitor)

Perhaps this will help in some way. Also my clusters.properties looks like this:

kafka.cluster.zkStr.cluster1=data-bus-zk09.rmk.ru:2181,data-bus-zk10.rmk.ru:2181,data-bus-zk11.rmk.ru:2181
kafka.cluster.servers.cluster1=databus-kafka16.rmk.ru:9192,databus-kafka17.rmk.ru:9192,databus-kafka18.rmk.ru:9192
secure.kafka.cluster.servers.cluster1=databus-kafka16.rmk.ru:9192,databus-kafka17.rmk.ru:9192,databus-kafka18.rmk.ru:9192
kafka.cluster.zkStr.cluster2=data-bus-zk01.rmk.ru:2181,data-bus-zk02.rmk.ru:2181,data-bus-zk03.rmk.ru:2181,data-bus-zk04.rmk.ru:2181,data-bus-zk05.rmk.ru:2181
kafka.cluster.servers.cluster2=databus-kafka04.rmk.ru:9192,databus-kafka05.rmk.ru:9192,databus-kafka06.rmk.ru:9192,databus-kafka07.rmk.ru:9192,databus-kafka08.rmk.ru:9192,databus-kafka09.rmk.ru:9192,databus-kafka10.rmk.ru:9192,databus-kafka11.rmk.ru:9192,databus-kafka12.rmk.ru:9192
secure.kafka.cluster.servers.cluster2=databus-kafka04.rmk.ru:9192,databus-kafka05.rmk.ru:9192,databus-kafka06.rmk.ru:9192,databus-kafka07.rmk.ru:9192,databus-kafka08.rmk.ru:9192,databus-kafka09.rmk.ru:9192,databus-kafka10.rmk.ru:9192,databus-kafka11.rmk.ru:9192,databus-kafka12.rmk.ru:9192

Port 9192 use SASL_SSL protocol and SCRAM-SHA-256,SCRAM-SHA-512 authorization mechanisms.

yangy0000 commented 2 years ago

Unfortunately, offset monitor is built on top of SimpleConsumer which doesn't support SASL_SSL protocal. We are planning to fix this too @laosiaudi

You can make contributions to this if you are interested. :)

disserakt commented 2 years ago

@yangy0000, @laosiaudi - here's the thing, it's clear - thanks a lot for the info! About contributions - well, we will try, maybe it will work out =)

And at the moment - perhaps there are alternative options, to replace OffsetMonitor with something, for monitoring offset lag - maybe some external utilities? I tried burrow - but nothing worked, most likely because the uReplicator does not create consumer groups.

I would be glad to hear any advice, thanks.