redis / lettuce

Advanced Java Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel, Pipelining, and codecs.
https://lettuce.io
MIT License
5.36k stars 959 forks source link

EventLoop thread causes timeouts #1775

Closed mouadk closed 3 years ago

mouadk commented 3 years ago

Bug Report

Current Behavior

Actually we are seeing a lots of Redis command timed out; nested exception is io.lettuce.core.RedisCommandTimeoutException: Command timed out after x minute(s), even if the command turned out to have accomplished its task successfully sometimes.

Increasing the timeout leads to event loop threads being stuck in the TIMED_WAITING (see attached thread dump)

thread dump

"lettuce-epollEventLoop-6-1" #90 daemon prio=5 os_prio=0 cpu=329.35ms elapsed=435.29s tid=0x000055c128ebc800 nid=0x5d waiting on condition  [0x00007f8007eb0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.9/Native Method)
        - parking to wait for  <0x00000000f5e6c810> (a java.util.concurrent.CompletableFuture$Signaller)
        at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.9/Unknown Source)
        at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.9/Unknown Source)
        at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.9/Unknown Source)
        at java.util.concurrent.CompletableFuture.timedGet(java.base@11.0.9/Unknown Source)
        at java.util.concurrent.CompletableFuture.get(java.base@11.0.9/Unknown Source)
        at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:81)
        at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:117)
        at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
        at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:79)
        at com.sun.proxy.$Proxy120.xack(Unknown Source)
        at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xAck(LettuceStreamCommands.java:82)
        at org.springframework.data.redis.connection.DefaultedRedisConnection.xAck(DefaultedRedisConnection.java:450)
        at org.springframework.data.redis.connection.DefaultStringRedisConnection.xAck(DefaultStringRedisConnection.java:3854)
        at org.springframework.data.redis.connection.RedisStreamCommands.xAck(RedisStreamCommands.java:59)
        at org.springframework.data.redis.core.DefaultStreamOperations.lambda$acknowledge$0(DefaultStreamOperations.java:122)
        at org.springframework.data.redis.core.DefaultStreamOperations$$Lambda$1846/0x0000000100c15440.doInRedis(Unknown Source)
        at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)
        at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
        at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
        at org.springframework.data.redis.core.DefaultStreamOperations.acknowledge(DefaultStreamOperations.java:122)
        at org.springframework.data.redis.core.StreamOperations.acknowledge(StreamOperations.java:66)
        at org.springframework.data.redis.core.StreamOperations.acknowledge(StreamOperations.java:78)
        at com.ks.cfc.ing.tenableio.repository.TaskRepository.ackExportVulnerabilityTask(TaskRepository.kt:116)
        at com.ks.cfc.ing.tenableio.repository.TaskRepository$$FastClassBySpringCGLIB$$8e4e2e8f.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:139)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at com.ks.cfc.ing.tenableio.repository.TaskRepository$$EnhancerBySpringCGLIB$$dec431d8.ackExportVulnerabilityTask(<generated>)
        at com.ks.cfc.ing.tenableio.task.listener.ExportVulnerabilityTaskListener$invokeVulnerabilityTaskProcessorThenAck$1.accept(ExportVulnerabilityTaskListener.kt:72)
        at com.ks.cfc.ing.tenableio.task.listener.ExportVulnerabilityTaskListener$invokeVulnerabilityTaskProcessorThenAck$1.accept(ExportVulnerabilityTaskListener.kt:23)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:147)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.ignoreDone(MonoIgnoreThen.java:190)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onComplete(MonoIgnoreThen.java:240)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:252)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:838)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:600)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:580)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:457)
        at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.drain(FluxFilterWhen.java:230)
        at reactor.core.publisher.FluxFilterWhen$FluxFilterWhenSubscriber.innerResult(FluxFilterWhen.java:363)
        at reactor.core.publisher.FluxFilterWhen$FilterWhenInner.onNext(FluxFilterWhen.java:443)
        at reactor.core.publisher.FluxFilterWhen$FilterWhenInner.onNext(FluxFilterWhen.java:407)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782)
        at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
        at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:917)
        at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:290)
        at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:781)
        at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59)
        at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:680)
        at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:640)
        at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:591)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(java.base@11.0.9/Unknown Source)
Stack trace ```java Command timed out after 1 minute(s) io.lettuce.core.RedisCommandTimeoutException: Command timed out after 1 minute(s) at io.lettuce.core.ExceptionFactory.createTimeoutException(ExceptionFactory.java:51) at io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:114) at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:69) at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) at com.sun.proxy.$Proxy41.lrange(Unknown Source) ```

Environment

Additional context

What is strange is this doesn’t happen when we configure a StreamMessageListenerContainer (used to subscribe to a Redis Stream) with a polling time out of ZERO duration, We have been asking ourself if it’s a bug or we are doing something wrong here.

Note that this strange behaviour happens only in Kubernetes cluster, locally it looks good.

Appreciate if you can help us demystify what is causing this strange behaviour.

mp911de commented 3 years ago

It looks as if you're mixing reactive programming with imperative API calls. Please either consistently use reactive programming or imperative. Mixing these two is almost a guarantee for blocked event loop threads.

mouadk commented 3 years ago

yep thanks for pointing it out