apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.19k stars 3.58k forks source link

metadata-store thread should not execute onPoliciesUpdate synchronized when PoliciesUpdate #15861

Open leizhiyuan opened 2 years ago

leizhiyuan commented 2 years ago

Describe the bug A clear and concise description of what the bug is.

To Reproduce Steps to reproduce the behavior: loadtest the admin api Expected behavior A clear and concise description of what you expected to happen.

metadata-store will not be blocked

Screenshots If applicable, add screenshots to help explain your problem.

"metadata-store-6-1" #174 prio=5 os_prio=0 cpu=336008.23ms elapsed=14968.31s tid=0x00007fdef0005000 nid=0x5680 waiting on condition [0x00007fdcd33f1000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007fdfdc100c00> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.offer(ScheduledThreadPoolExecutor.java:1010)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.add(ScheduledThreadPoolExecutor.java:1037)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.add(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:328)
        at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:573)
        at org.apache.pulsar.common.util.RateLimiter.createTask(RateLimiter.java:259)
        at org.apache.pulsar.common.util.RateLimiter.setRate(RateLimiter.java:238)
        - locked <0x00007fe0d14eb950> (a org.apache.pulsar.common.util.RateLimiter)
        at org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.updateDispatchRate(DispatchRateLimiter.java:398)
        - locked <0x00007fe0d14eb8a8> (a org.apache.pulsar.broker.service.persistent.DispatchRateLimiter)
        at org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.onPoliciesUpdate(DispatchRateLimiter.java:286)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$null$113(PersistentTopic.java:2632)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1702/1325515424.accept(Unknown Source)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$null$114(PersistentTopic.java:2632)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1696/1099205637.run(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
        at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:731)
        at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2023)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$null$115(PersistentTopic.java:2625)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1693/1143110949.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:544)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:272)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$onPoliciesUpdate$118(PersistentTopic.java:2622)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$1692/39951688.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.onPoliciesUpdate(PersistentTopic.java:2620)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$91(BrokerService.java:1980)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1690/1978241112.accept(Unknown Source)
        at java.util.Optional.ifPresent(Optional.java:159)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$92(BrokerService.java:1980)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1689/1018119492.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$93(BrokerService.java:1975)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1670/803871005.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:544)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:272)
        at org.apache.pulsar.broker.service.BrokerService.lambda$handlePoliciesUpdates$94(BrokerService.java:1971)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1669/28868178.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$7(ZKMetadataStore.java:139)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$112/660057515.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)

Desktop (please complete the following information):

Additional context Add any other context about the problem here.

2.9

leizhiyuan commented 2 years ago

same problem

"metadata-store-6-1" #174 prio=5 os_prio=0 cpu=2400952.83ms elapsed=15380.51s tid=0x00007f1638005000 nid=0x7392 runnable [0x00007f14b0ece000]
   java.lang.Thread.State: RUNNABLE
        at com.google.common.base.Splitter.splitToList(Splitter.java:419)
        at org.apache.pulsar.common.naming.TopicName.<init>(TopicName.java:135)
        at org.apache.pulsar.common.naming.TopicName.<init>(TopicName.java:36)
        at org.apache.pulsar.common.naming.TopicName$1.load(TopicName.java:59)
        at org.apache.pulsar.common.naming.TopicName$1.load(TopicName.java:56)
        at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
        at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
        at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
        - locked <0x00007f1ef8de8790> (a com.google.common.cache.LocalCache$StrongAccessEntry)
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
        at com.google.common.cache.LocalCache.get(LocalCache.java:3951)
        at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4935)
        at org.apache.pulsar.common.naming.TopicName.get(TopicName.java:87)
        at org.apache.pulsar.broker.service.BrokerService.lambda$null$93(BrokerService.java:1972)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1684/545038104.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:544)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:272)
        at org.apache.pulsar.broker.service.BrokerService.lambda$handlePoliciesUpdates$94(BrokerService.java:1971)
        at org.apache.pulsar.broker.service.BrokerService$$Lambda$1681/314050124.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$null$7(ZKMetadataStore.java:139)
        at org.apache.pulsar.metadata.impl.ZKMetadataStore$$Lambda$112/1874438736.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:750)
leizhiyuan commented 2 years ago
protected CompletableFuture<Void> receivedNotification(Notification notification) {
        try {
            return CompletableFuture.supplyAsync(() -> {
                listeners.forEach(listener -> {
                    try {
                        listener.accept(notification);
                    } catch (Throwable t) {
                        log.error("Failed to process metadata store notification", t);
                    }
                });

                return null;
            }, executor);
        } catch (RejectedExecutionException e) {
            return FutureUtil.failedFuture(e);
        }
    }

it seems here use a single thread , maybe the owner want to keep order? can we use ordered threadpool to execute receivedNotification?

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.