O2-Czech-Republic / proxima-platform

The Proxima platform.
Apache License 2.0
18 stars 7 forks source link

BatchLogReader blocked on close #292

Closed je-ik closed 1 year ago

je-ik commented 1 year ago
"ReadBatchUnbounded:<..>//ParDo(BatchLogRead)/ParMultiDo(BatchLogRead)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.ProcessElements -> AnonymousTransform2/Filter/ParDo(Anonymous)/ParMultiDo(Anonymous) (14/200)#0" #115 prio=5 os_prio=0 cpu=12688.32ms elapsed=6638.65s tid=0x00007f8580344000 nid=0x151 waiting on condition  [0x00007f84f7efd000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
        - parking to wait for  <0x000000030161f520> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.15/LockSupport.java:234)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(java.base@11.0.15/AbstractQueuedSynchronizer.java:1079)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(java.base@11.0.15/AbstractQueuedSynchronizer.java:1369)
        at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:278)
        at cz.o2.proxima.direct.batch.TerminationContext.lambda$cancel$c5b2a3c$1(TerminationContext.java:48)
        at cz.o2.proxima.direct.batch.TerminationContext$$Lambda$1910/0x000000084109ec40.run(Unknown Source)
        at cz.o2.proxima.util.ExceptionUtils.ignoringInterrupted(ExceptionUtils.java:157)
        at cz.o2.proxima.direct.batch.TerminationContext.cancel(TerminationContext.java:48)
        at cz.o2.proxima.direct.batch.TerminationContext.lambda$asObserveHandle$0(TerminationContext.java:76)
        at cz.o2.proxima.direct.batch.TerminationContext$$Lambda$1866/0x0000000841025040.close(Unknown Source)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ForwardingObserveHandle.close(BatchLogReaders.java:57)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ThroughputLimitedBatchLogReader$2.close(BatchLogReaders.java:133)
        at cz.o2.proxima.beam.direct.io.BatchLogRead$BatchLogReadFn.process(BatchLogRead.java:232)
        at cz.o2.proxima.beam.direct.io.BatchLogRead$BatchLogReadFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:125)
        at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:567)
        at org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:175)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.fireTimerInternal(DoFnOperator.java:1037)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.onProcessingTime(DoFnOperator.java:1031)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl$$Lambda$1912/0x00000008410b8040.onProcessingTime(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
        at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$1855/0x0000000841012040.run(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
        at org.apache.flink.runtime.taskmanager.Task$$Lambda$1782/0x0000000840f35840.run(Unknown Source)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.lang.Thread.run(java.base@11.0.15/Thread.java:829)
je-ik commented 1 year ago

Might be related to disabling throughput limit in BatchLogRead not working as suggests

"DirectDataOperatorThread-5312e2fc-fbad-4175-8efe-5dd064b0fe5f-102" #20692 prio=5 os_prio=0 cpu=3400.57ms elapsed=674.98s tid=0x00007f8574195000 nid=0x51b5 waiting for monitor entry  [0x00007f84c8cf6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at cz.o2.proxima.storage.watermark.GlobalWatermarkThroughputLimiter.getPauseTime(GlobalWatermarkThroughputLimiter.java:123)
        - waiting to lock <0x00000007d143b128> (a cz.o2.proxima.storage.watermark.GlobalWatermarkThroughputLimiter)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ThroughputLimitedBatchLogObserver.getPauseTime(BatchLogReaders.java:247)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ThroughputLimitedBatchLogObserver.waitIfNecessary(BatchLogReaders.java:221)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ThroughputLimitedBatchLogObserver$$Lambda$1892/0x0000000841087c40.run(Unknown Source)
        at cz.o2.proxima.util.ExceptionUtils.ignoringInterrupted(ExceptionUtils.java:157)
        at cz.o2.proxima.direct.batch.BatchLogReaders$ThroughputLimitedBatchLogObserver.onNext(BatchLogReaders.java:214)
        at cz.o2.proxima.direct.blob.BlobLogReader.lambda$processSinglePartition$b4ae9411$1(BlobLogReader.java:286)
        at cz.o2.proxima.direct.blob.BlobLogReader$$Lambda$1871/0x0000000841023440.run(Unknown Source)
        at cz.o2.proxima.direct.gcloud.storage.GCloudLogReader.lambda$runHandlingErrors$0(GCloudLogReader.java:45)
        at cz.o2.proxima.direct.gcloud.storage.GCloudLogReader$$Lambda$1872/0x0000000841022040.run(Unknown Source)
        at cz.o2.proxima.direct.blob.RetryStrategy.lambda$retry$3e8d1710$1(RetryStrategy.java:54)
        at cz.o2.proxima.direct.blob.RetryStrategy$$Lambda$1873/0x0000000841022440.apply(Unknown Source)
        at cz.o2.proxima.direct.blob.RetryStrategy.retry(RetryStrategy.java:63)
        at cz.o2.proxima.direct.blob.RetryStrategy.retry(RetryStrategy.java:52)
        at cz.o2.proxima.direct.gcloud.storage.GCloudLogReader.runHandlingErrors(GCloudLogReader.java:42)
        at cz.o2.proxima.direct.gcloud.storage.GCloudLogReader.runHandlingErrors(GCloudLogReader.java:29)
        at cz.o2.proxima.direct.blob.BlobLogReader.processSinglePartition(BlobLogReader.java:273)
        at cz.o2.proxima.direct.blob.BlobLogReader.lambda$observeInternal$1(BlobLogReader.java:244)
        at cz.o2.proxima.direct.blob.BlobLogReader$$Lambda$1863/0x0000000841026440.run(Unknown Source)
        at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.15/Executors.java:515)
        at java.util.concurrent.FutureTask.run(java.base@11.0.15/FutureTask.java:264)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.15/ThreadPoolExecutor.java:1128)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.15/ThreadPoolExecutor.java:628)
        at java.lang.Thread.run(java.base@11.0.15/Thread.java:829)
je-ik commented 1 year ago

Closed via datadrivencz#733