apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.13k stars 3.57k forks source link

Debezium MySQL connector will block after restart #9227

Closed yufan022 closed 3 years ago

yufan022 commented 3 years ago

Describe the bug Starting debezium for the first time and everything is OK. When restart debezium after running for a while, it often fails to start. There's a very small chance that it will success. If I clean up offset-topic or restart broker, It will start successfully.

To Reproduce Steps to reproduce the behavior:

  1. start debezium mysql connector
  2. stop debezium mysql connector
  3. restart debezium mysql connector - failed

or

  1. start debezium
  2. stop debezium
  3. clean up offset-topic or restart broker
  4. restart debezium - success

Screenshots If applicable, add screenshots to help explain your problem. image

Desktop (please complete the following information):

Additional context

bin/pulsar-admin sources create --archive connectors/pulsar-io-debezium-mysql-2.7.0.nar --name debezium-mysql-source --destination-topic-name public/xxxx/debezium-mysql-topic --tenant public --namespace xxxx --cpu 2 --ram 1073741824 --source-config '{"database.hostname": "x.x.x.x","database.port": "xxxx","database.user": "xxxx","database.password": "xxxx","database.server.id": "184054","database.server.name": "xxxx","database.whitelist": "xxxx","database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory","database.history.pulsar.topic": "public/xxxx/history-topic","database.history.pulsar.service.url": "pulsar://pulsar-mini-proxy:6650","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","pulsar.service.url": "pulsar://pulsar-mini-proxy:6650","offset.storage.topic": "public/xxxx/offset-topic","decimal.handling.mode": "string"}'

debezium pod log

05:26:48.269 [pulsar-client-io-7-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {
  "serviceUrl" : "pulsar://pulsar-mini-proxy:6650",
  "authPluginClassName" : null,
  "operationTimeoutMs" : 30000,
  "statsIntervalSeconds" : 60,
  "numIoThreads" : 1,
  "numListenerThreads" : 1,
  "connectionsPerBroker" : 1,
  "useTcpNoDelay" : true,
  "useTls" : false,
  "tlsTrustCertsFilePath" : "",
  "tlsAllowInsecureConnection" : false,
  "tlsHostnameVerificationEnable" : false,
  "concurrentLookupRequest" : 5000,
  "maxLookupRequest" : 50000,
  "maxLookupRedirects" : 20,
  "maxNumberOfRejectedRequestPerConnection" : 50,
  "keepAliveIntervalSeconds" : 30,
  "connectionTimeoutMs" : 10000,
  "requestTimeoutMs" : 60000,
  "initialBackoffIntervalNanos" : 100000000,
  "maxBackoffIntervalNanos" : 60000000000,
  "listenerName" : null,
  "useKeyStoreTls" : false,
  "sslProvider" : null,
  "tlsTrustStoreType" : "JKS",
  "tlsTrustStorePath" : null,
  "tlsTrustStorePassword" : null,
  "tlsCiphers" : [ ],
  "tlsProtocols" : [ ],
  "proxyServiceUrl" : null,
  "proxyProtocol" : null,
  "enableTransaction" : false
}
05:26:48.275 [pulsar-client-io-7-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/3427/offset-topic][reader-5579d7a108] Subscribing to topic on cnx [id: 0xa3f72b6f, L:/10.42.14.92:40958 - R:pulsar-mini-proxy.pulsar.svc.cluster.local/10.43.18.5:6650], consumerId 0
05:26:48.282 [pulsar-client-io-7-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/3427/offset-topic][reader-5579d7a108] Subscribed to topic on pulsar-mini-proxy.pulsar.svc.cluster.local/10.43.18.5:6650 -- consumer: 0
05:26:48.297 [pulsar-client-io-7-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
05:26:48.402 [pulsar-client-io-7-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/3427/offset-topic][reader-5579d7a108] Successfully getLastMessageId 1969:96
05:27:48.271 [pulsar-timer-10-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [public/3427/offset-topic] [reader-5579d7a108] [ca99a] Prefetched messages: 858 --- Consume throughput received: 10.70 msgs/s --- 0.01 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

broker pod log

12:50:02.277 [main-EventThread] INFO  org.apache.distributedlog.ReadAheadEntryReader - segments is updated with [[LogSegmentId:2611, firstTxId:1024, lastTxId:9223372036854775807, version:VERSION_V5_SEQUENCE_ID, completionTime:1610973283020, recordCount:167175, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:665, lastSlotId:0, inprogress:false, minActiveDLSN:DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}, startSequenceId:0]]
12:50:02.280 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=pulsar-mini-zookeeper:2181 sessionTimeout=30000 watcher=org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase@563c9ef1
12:50:02.281 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.zookeeper.ClientCnxnSocket - jute.maxbuffer value is 10485760 Bytes
12:50:02.281 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.zookeeper.ClientCnxn - zookeeper.request.timeout value is 0. feature enabled=
12:50:02.284 [DLM-/pulsar/functions-OrderedScheduler-0-0-SendThread(pulsar-mini-zookeeper:2181)] INFO  org.apache.zookeeper.ClientCnxn - Opening socket connection to server pulsar-mini-zookeeper/10.42.14.103:2181. Will not attempt to authenticate using SASL (unknown error)
12:50:02.284 [DLM-/pulsar/functions-OrderedScheduler-0-0-SendThread(pulsar-mini-zookeeper:2181)] INFO  org.apache.zookeeper.ClientCnxn - Socket connection established, initiating session, client: /10.42.14.107:40942, server: pulsar-mini-zookeeper/10.42.14.103:2181
12:50:02.298 [DLM-/pulsar/functions-OrderedScheduler-0-0-SendThread(pulsar-mini-zookeeper:2181)] INFO  org.apache.zookeeper.ClientCnxn - Session establishment complete on server pulsar-mini-zookeeper/10.42.14.103:2181, sessionid = 0x100a5806e19000f, negotiated timeout = 30000
12:50:02.299 [DLM-/pulsar/functions-OrderedScheduler-0-0-EventThread] INFO  org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase - ZooKeeper client is connected now.
12:50:02.300 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase - Initialize zookeeper metadata driver with external zookeeper client : ledgersRootPath = /ledgers.
12:50:02.303 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Initialize rackaware ensemble placement policy @ <Bookie:10.42.14.107:0> @ /default-region/default-rack : org.apache.distributedlog.net.DNSResolverForRacks.
12:50:02.303 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Not weighted
12:50:02.303 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.DefaultBookieAddressResolver - Resolving dummy bookie Id 10.42.14.107:0 using legacy bookie resolver
org.apache.bookkeeper.client.BKException$BKBookieHandleNotAvailableException: Bookie handle is not available
    at org.apache.bookkeeper.discover.ZKRegistrationClient.getBookieServiceInfo(ZKRegistrationClient.java:248) ~[org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.DefaultBookieAddressResolver.resolve(DefaultBookieAddressResolver.java:43) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.resolveNetworkLocation(TopologyAwareEnsemblePlacementPolicy.java:789) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.getRegion(RegionAwareEnsemblePlacementPolicy.java:90) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.getLocalRegion(RegionAwareEnsemblePlacementPolicy.java:110) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.initialize(RegionAwareEnsemblePlacementPolicy.java:173) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.RegionAwareEnsemblePlacementPolicy.initialize(RegionAwareEnsemblePlacementPolicy.java:53) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.BookKeeper.initializeEnsemblePlacementPolicy(BookKeeper.java:580) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.BookKeeper.<init>(BookKeeper.java:504) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.client.BookKeeper$Builder.build(BookKeeper.java:307) [org.apache.bookkeeper-bookkeeper-server-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.BookKeeperClient.commonInitialization(BookKeeperClient.java:121) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.BookKeeperClient.initialize(BookKeeperClient.java:172) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.BookKeeperClient.get(BookKeeperClient.java:199) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore.openReader(BKLogSegmentEntryStore.java:204) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader$SegmentReader.openReader(ReadAheadEntryReader.java:127) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader.unsafeInitializeLogSegments(ReadAheadEntryReader.java:841) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader.unsafeProcessLogSegments(ReadAheadEntryReader.java:691) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader.access$1000(ReadAheadEntryReader.java:62) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader$4.safeRun(ReadAheadEntryReader.java:682) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at org.apache.distributedlog.ReadAheadEntryReader$CloseableRunnable.run(ReadAheadEntryReader.java:225) [org.apache.distributedlog-distributedlog-core-4.12.0.jar:4.12.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_275]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_275]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_275]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_275]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_275]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_275]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
12:50:02.304 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.BookKeeper - Weighted ledger placement is not enabled
12:50:02.307 [DLM-/pulsar/functions-OrderedScheduler-0-0-EventThread] INFO  org.apache.bookkeeper.discover.ZKRegistrationClient - Update BookieInfoCache (writable bookie) pulsar-mini-bookie-1.pulsar-mini-bookie.pulsar.svc.cluster.local:3181 -> BookieServiceInfo{properties={}, endpoints=[EndpointInfo{id=httpserver, port=8000, host=0.0.0.0, protocol=http, auth=[], extensions=[]}, EndpointInfo{id=bookie, port=3181, host=pulsar-mini-bookie-1.pulsar-mini-bookie.pulsar.svc.cluster.local, protocol=bookie-rpc, auth=[], extensions=[]}]}
12:50:02.308 [DLM-/pulsar/functions-OrderedScheduler-0-0-EventThread] INFO  org.apache.bookkeeper.discover.ZKRegistrationClient - Update BookieInfoCache (writable bookie) pulsar-mini-bookie-2.pulsar-mini-bookie.pulsar.svc.cluster.local:3181 -> BookieServiceInfo{properties={}, endpoints=[EndpointInfo{id=httpserver, port=8000, host=0.0.0.0, protocol=http, auth=[], extensions=[]}, EndpointInfo{id=bookie, port=3181, host=pulsar-mini-bookie-2.pulsar-mini-bookie.pulsar.svc.cluster.local, protocol=bookie-rpc, auth=[], extensions=[]}]}
12:50:02.308 [DLM-/pulsar/functions-OrderedScheduler-0-0-EventThread] INFO  org.apache.bookkeeper.discover.ZKRegistrationClient - Update BookieInfoCache (writable bookie) pulsar-mini-bookie-0.pulsar-mini-bookie.pulsar.svc.cluster.local:3181 -> BookieServiceInfo{properties={}, endpoints=[EndpointInfo{id=httpserver, port=8000, host=0.0.0.0, protocol=http, auth=[], extensions=[]}, EndpointInfo{id=bookie, port=3181, host=pulsar-mini-bookie-0.pulsar-mini-bookie.pulsar.svc.cluster.local, protocol=bookie-rpc, auth=[], extensions=[]}]}
12:50:02.309 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-1.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.309 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Initialize rackaware ensemble placement policy @ <Bookie:10.42.14.107:0> @ /default-region/default-rack : org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy$DNSResolverDecorator.
12:50:02.309 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl - Not weighted
12:50:02.310 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-2.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.311 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-0.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.311 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-1.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.311 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-2.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.311 [BookKeeperClientScheduler-OrderedScheduler-0-0] INFO  org.apache.bookkeeper.net.NetworkTopologyImpl - Adding a new node: /pulsar/mini/pulsar-mini-bookie-0.pulsar-mini-bookie.pulsar.svc.cluster.local:3181
12:50:02.313 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.distributedlog.BookKeeperClient - BookKeeper Client created bk:distributedlog://pulsar-mini-zookeeper:2181/pulsar/functions:factory_writer_shared with its own ZK Client : ledgersPath = /ledgers, numRetries = 3, sessionTimeout = 30000, backoff = 5000, maxBackoff = 30000, dnsResolver =
12:50:02.314 [DLM-/pulsar/functions-OrderedScheduler-0-0] INFO  org.apache.distributedlog.ReadAheadEntryReader - Reinitialize log segments with [[LogSegmentId:2611, firstTxId:1024, lastTxId:9223372036854775807, version:VERSION_V5_SEQUENCE_ID, completionTime:1610973283020, recordCount:167175, regionId:0, status:0, logSegmentSequenceNumber:1, lastEntryId:665, lastSlotId:0, inprogress:false, minActiveDLSN:DLSN{logSegmentSequenceNo=1, entryId=0, slotId=0}, startSequenceId:0]]
codelipenghui commented 3 years ago

@yufan022 Could you please confirm the available bookies greater than the ensemble size that you are using?

yufan022 commented 3 years ago

@codelipenghui I checked again, I have to restart broker before debezium restart, It will succeed. Broker config:

managedLedgerDefaultAckQuorum=2
managedLedgerDefaultEnsembleSize=2
managedLedgerDefaultWriteQuorum=2

My deployment topology:

pod/pf-public-3428-exchange-core-0                1/1     Running     0          29s
pod/pulsar-mini-bookie-0                          1/1     Running     0          139m
pod/pulsar-mini-bookie-1                          1/1     Running     0          140m
pod/pulsar-mini-bookie-2                          1/1     Running     0          140m
pod/pulsar-mini-bookie-init-fc96s                 0/1     Completed   0          4d
pod/pulsar-mini-broker-0                          1/1     Running     0          2m32s
pod/pulsar-mini-grafana-77c7888bd6-7njwj          1/1     Running     0          4d
pod/pulsar-mini-prometheus-5d78c66d5f-x8x6r       1/1     Running     0          3d23h
pod/pulsar-mini-proxy-0                           1/1     Running     0          3d21h
pod/pulsar-mini-pulsar-init-4g2px                 0/1     Completed   0          3d23h
pod/pulsar-mini-pulsar-manager-6d7d86d78b-n9dtj   1/1     Running     0          4d
pod/pulsar-mini-recovery-0                        1/1     Running     0          3d21h
pod/pulsar-mini-toolset-0                         1/1     Running     0          3d16h
pod/pulsar-mini-zookeeper-0                       1/1     Running     0          19h
pod/pulsar-mini-zookeeper-1                       1/1     Running     0          19h
pod/pulsar-mini-zookeeper-2                       1/1     Running     0          19h
yufan022 commented 3 years ago

@codelipenghui more log broker.log pulsarfunctiondebezium.log

I think it is not caused by broker error Bookie handle is not available, I tested 2.6.0 2.7.0 and multi-node or single-node it can be reproduced.

yufan022 commented 3 years ago

Open debug log of Broker and Debezium.

// Debezium
02:22:49.000 [pulsar-client-io-7-1] DEBUG org.apache.pulsar.client.impl.ClientCnx - [id: 0xca74dac6, L:/10.42.3.36:50100 - R:pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local/10.42.3.36:6650] Received a message from the server: org.apache.pulsar.common.api.proto.PulsarApi$CommandMessage@a075ae2
02:22:49.000 [pulsar-client-io-7-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [public/3426/offset-topic][reader-86b2926c2c] Received message: 6/1999
02:22:49.000 [pulsar-client-io-7-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [reader-86b2926c2c] [0d080] processing message num - 0 in batch
02:22:49.000 [pulsar-client-io-7-1] DEBUG org.apache.pulsar.client.impl.ConsumerImpl - [reader-86b2926c2c] [0d080] enqueued messages in batch. queue size - 878, available queue size - 2147483647
...
02:23:17.730 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [localhost/127.0.0.1:6650] Received cmd PING
02:23:17.730 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x474729d9, L:/127.0.0.1:38340 - R:localhost/127.0.0.1:6650]] Replying back to ping message
02:23:17.741 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0x474729d9, L:/127.0.0.1:38340 - R:localhost/127.0.0.1:6650]] Sending ping message
02:23:17.744 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [localhost/127.0.0.1:6650] Received cmd PONG
02:23:17.938 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0xd7d4d92a, L:/10.42.3.36:50094 - R:pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local/10.42.3.36:6650]] Sending ping message
02:23:17.939 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local/10.42.3.36:6650] Received cmd PONG
02:23:17.943 [pulsar-client-io-1-1] DEBUG org.apache.pulsar.common.protocol.PulsarDecoder - [pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local/10.42.3.36:6650] Received cmd PING
...
02:23:48.318 [pulsar-client-io-7-1] DEBUG org.apache.pulsar.common.protocol.PulsarHandler - [[id: 0xca74dac6, L:/10.42.3.36:50100 - R:pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local/10.42.3.36:6650]] Replying back to ping message
02:23:48.442 [pulsar-timer-10-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [public/3426/offset-topic] [reader-86b2926c2c] [0d080] Prefetched messages: 878 --- Consume throughput received: 18.70 msgs/s --- 0.03 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0

// Broker
02:22:48.984 [pulsar-io-24-2] DEBUG org.apache.pulsar.broker.service.PulsarCommandSenderImpl - [persistent://public/3426/offset-topic-PersistentSubscription{topic=persistent://public/3426/offset-topic, name=reader-86b2926c2c}] Sending message to consumerId 0, msg id 6-1996
02:22:48.984 [pulsar-io-24-2] DEBUG org.apache.pulsar.broker.service.PulsarCommandSenderImpl - [persistent://public/3426/offset-topic-PersistentSubscription{topic=persistent://public/3426/offset-topic, name=reader-86b2926c2c}] Sending message to consumerId 0, msg id 6-1997
02:22:48.984 [pulsar-io-24-2] DEBUG org.apache.pulsar.broker.service.PulsarCommandSenderImpl - [persistent://public/3426/offset-topic-PersistentSubscription{topic=persistent://public/3426/offset-topic, name=reader-86b2926c2c}] Sending message to consumerId 0, msg id 6-1998
02:22:48.984 [pulsar-io-24-2] DEBUG org.apache.pulsar.broker.service.PulsarCommandSenderImpl - [persistent://public/3426/offset-topic-PersistentSubscription{topic=persistent://public/3426/offset-topic, name=reader-86b2926c2c}] Sending message to consumerId 0, msg id 6-1999
02:22:48.984 [broker-topic-workers-OrderedScheduler-4-0] DEBUG org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer - [persistent://public/3426/offset-topic / reader-86b2926c2c-Consumer{subscription=PersistentSubscription{topic=persistent://public/3426/offset-topic, name=reader-86b2926c2c}, consumerId=0, consumerName=0d080, address=/10.42.3.36:50100}] Consumer buffer is full, pause reading
02:22:50.014 [AsyncHttpClient-timer-57-1] DEBUG org.asynchttpclient.netty.channel.DefaultChannelPool - Entry count for : http://pulsar-mini-broker-0.pulsar-mini-broker.pulsar.svc.cluster.local:8080 : 1
02:22:50.689 [pulsar-ordered-OrderedExecutor-1-0-SendThread(pulsar-mini-zookeeper:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x104fca0f218003c, packet:: clientPath:/ serverPath:/ finished:false header:: 319,3  replyHeader:: 319,4212,0  request:: '/,F  response:: s{0,0,0,0,0,9,0,0,0,11,120}

When Debezium blocked, I found ...Consumer buffer is full, pause reading logs on Broker and Consume throughput received... on Debezium connector. Then I have to restart broker to keep Debezium working.

https://github.com/apache/pulsar/blob/ed5d94ccfdf4eba77678454945a2c3719dce2268/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L359 https://github.com/apache/pulsar/blob/ed5d94ccfdf4eba77678454945a2c3719dce2268/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L457 @sijie @codelipenghui Could anyone help me work out this issue?

codelipenghui commented 3 years ago

@yufan022 The consumer buffer full means the consumer does not have more space(recieverQueueSize) for the new messages, could you please check the topic stats and internal-stats of the topic that debezium connector used? bin/pulsar-admin topics stats and bin/pulsar-admin topics stats-internal

codelipenghui commented 3 years ago

@yufan022 I will close this issue first. Feel free to reopen it if needed.