Open dhymasriyanto opened 2 years ago
When I remove XReadNonBlockConsumer class, and comment out
//streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
//new AsyncConsumeStreamListener("Independentconsumption", null, null)
//);
No error happen, but, I don't know how to get data for independent consumption
Value must not be null! explain
English:
First
: There is a problem with this project. It is to demonstrate the error. For details, please refer to https://github.com/spring-projects/spring-data-redis/issues/2198。
Second
: For a working example, refer to https://gitee.com/huan1993/spring-cloud-parent/tree/master/redis/redis-stream
Third
: In the 2.7 M3
version, this problem has been fixed, you can upgrade the version of spring data redis
Chinese: 这个项目是有问题的,是为了演示你上方出现的错误,并关联到spring data redis 的 issues https://github.com/spring-projects/spring-data-redis/issues/2198。官方给出了这个问题出现的原因,也进行了修复,详情请看2198这个issues。
Exception: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'stream-001' or consumer group 'group-a' in XREADGROUP with GROUP option
explain:
If the program runs for the first time and an error occurs, it is because the Stream
or consumer group has not been created in advance
When I remove XReadNonBlockConsumer class, and comment out
//streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), //new AsyncConsumeStreamListener("Independentconsumption", null, null) //);
No error happen, but, I don't know how to get data for independent consumption
Is it possible that your Stream
type of message serialization is not the same way, so that if the independent consumer consumes the message from scratch, your error may occur. For independent consumers, you can start with the latest message consumption.
StringBuilder readOffset = new StringBuilder("$");
List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
.read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
Ahh, I see, it's actually get acknowledged by consumer group, when I comment out the consumer group, the independent is working.
2022-06-22 12:54:46.128 INFO 185564 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9090 (http) with context path ''
2022-06-22 12:54:46.135 INFO 185564 --- [ main] n.t.c.ClassroomServiceApplication : Started ClassroomServiceApplication in 2.495 seconds (JVM running for 2.713)
2022-06-22 12:54:46.137 INFO 185564 --- [ main] n.t.c.CycleGeneratorStreamMessageRunner : stream-001
2022-06-22 12:54:46.138 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 12:54:46.162 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655877286162-0]
2022-06-22 12:54:51.137 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 12:54:51.141 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655877291141-0]
2022-06-22 12:54:56.137 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 12:54:56.144 INFO 185564 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655877296143-0]
^C2022-06-22 12:55:00.386 INFO 185564 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 12:55:00.388 INFO 185564 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 12:55:00.391 INFO 185564 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
[INFO] ------------------------------------------------------------------------
But, it's still get an error when running the application at first.
Exception in thread "xread-nonblock-01" java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:171)
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594)
at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413)
at ngodingkuy.tech.classroomservice.consumer.XReadNonBlockConsumer01.lambda$afterPropertiesSet$1(XReadNonBlockConsumer01.java:64)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2022-06-22 12:54:45.603 INFO 185564 --- [ream-consumer-1] n.t.c.dto.CustomErrorHandler : Error exception occured:
org.springframework.core.convert.ConversionFailedException: Failed to convert from type [org.springframework.data.redis.connection.stream.StreamRecords$ByteMapBackedRecord] to type [ngodingkuy.tech.classroomservice.model.Classroom] for value 'MapBackedRecord{recordId=1655810534413-0, kvMap={[B@4ef29d7d=[B@4627f2ac}}'; nested exception is java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:198) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:176) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.18.jar:5.3.18]
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:577) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getDeserializer$2(DefaultStreamMessageListenerContainer.java:240) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.convertRecord(StreamPollTask.java:196) ~[spring-data-redis-2.6.3.jar:2.6.3]
... 6 common frames omitted
Exception: Error in execution; nested exception is io.lettuce.core.RedisCommandExecutionException: NOGROUP No such key 'stream-001' or consumer group 'group-a' in XREADGROUP with GROUP option
explain: If the program runs for the first time and an error occurs, it is because the
Stream
or consumer group has not been created in advance
I already create the consumer group, it's look like when the bean created, it send null data, so it will get an error output. (i don't know actually, i am new in spring boot)
Out of topic of the issue : An error also happen when I try to shutdown the application. The output will be like:
^C2022-06-22 12:55:56.596 WARN 188796 --- [ream-consumer-3] io.lettuce.core.RedisChannelHandler : Connection is already closed
2022-06-22 12:55:56.596 WARN 188796 --- [ream-consumer-2] io.lettuce.core.RedisChannelHandler : Connection is already closed
2022-06-22 12:55:56.596 WARN 188796 --- [ream-consumer-4] io.lettuce.core.RedisChannelHandler : Connection is already closed
2022-06-22 12:55:56.596 INFO 188796 --- [ream-consumer-3] n.t.c.dto.CustomErrorHandler : Error exception occured:
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:350) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$3(DefaultStreamMessageListenerContainer.java:259) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$4(DefaultStreamMessageListenerContainer.java:258) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
... 1 common frames omitted
2022-06-22 12:55:56.596 INFO 188796 --- [ream-consumer-2] n.t.c.dto.CustomErrorHandler : Error exception occured:
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:350) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$3(DefaultStreamMessageListenerContainer.java:259) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$4(DefaultStreamMessageListenerContainer.java:258) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
... 1 common frames omitted
2022-06-22 12:55:56.596 INFO 188796 --- [ream-consumer-4] n.t.c.dto.CustomErrorHandler : Error exception occured:
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:350) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$3(DefaultStreamMessageListenerContainer.java:259) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$4(DefaultStreamMessageListenerContainer.java:258) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
... 1 common frames omitted
2022-06-22 12:55:56.705 INFO 188796 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 12:55:56.709 INFO 188796 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 12:55:56.712 INFO 188796 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
[INFO] ------------------------------------------------------------------------
According to this, said that we need to shut down the StreamMessageListenerContainer
prior to shutting down the application. But I dont know how to shutdown the listener first, because I am new in spring boot.
First: In Redis del stream-001
Second: create group In Redis XGROUP CREATE stream-001 group-a $ mkstream
create group In Redis XGROUP CREATE stream-001 group-b $ mkstream
Third: check StreamMessageListenerContainer
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
}
destroyMethod = "stop"
needs to be configured
Four: Check whether the serialization method is consistent in various places
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
RedisStreamConfiguration
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.keySerializer(RedisSerializer.string())
.hashKeySerializer(RedisSerializer.string())
.hashValueSerializer(RedisSerializer.string())
....
.build();
.....
return streamMessageListenerContainer;
}
Five: run log
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.6)
2022-06-22 15:10:40.552 INFO 67847 --- [ main] c.h.study.redis.RedisStreamApplication : Starting RedisStreamApplication using Java 1.8.0_322 on huandeMacBook-Pro.local with PID 67847 (/Users/huan/code/IdeaProjects/me/spring-cloud-parent/redis/redis-stream/target/classes started by huan in /Users/huan/code/IdeaProjects/me/spring-cloud-parent)
2022-06-22 15:10:40.554 INFO 67847 --- [ main] c.h.study.redis.RedisStreamApplication : No active profile set, falling back to default profiles: default
2022-06-22 15:10:41.117 INFO 67847 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode!
2022-06-22 15:10:41.118 INFO 67847 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Redis repositories in DEFAULT mode.
2022-06-22 15:10:41.136 INFO 67847 --- [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 9 ms. Found 0 Redis repository interfaces.
2022-06-22 15:10:41.493 INFO 67847 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2022-06-22 15:10:41.499 INFO 67847 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2022-06-22 15:10:41.499 INFO 67847 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.54]
2022-06-22 15:10:41.552 INFO 67847 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2022-06-22 15:10:41.552 INFO 67847 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 955 ms
2022-06-22 15:10:42.243 INFO 67847 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2022-06-22 15:10:42.261 INFO 67847 --- [ main] c.h.study.redis.RedisStreamApplication : Started RedisStreamApplication in 1.992 seconds (JVM running for 2.382)
2022-06-22 15:10:42.537 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : generate book info:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:42.566 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : return record-id:[1655881842812-0]
2022-06-22 15:10:42.571 INFO 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : Independentconsumption receive data id:[1655881842812-0] book:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:42.571 INFO 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : Independentconsumption receive data id:[1655881842812-0] book:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:42.573 INFO 67847 --- [ream-consumer-1] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [Independentconsumption]: receive data stream:[stream-001],id:[1655881842812-0],value:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:42.573 INFO 67847 --- [ream-consumer-4] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [group b] group:[group-b] consumerName:[consumer-bb] receive data stream:[stream-001],id:[1655881842812-0],value:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:42.573 INFO 67847 --- [ream-consumer-3] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [group a] group:[group-a] consumerName:[consumer-b] receive data stream:[stream-001],id:[1655881842812-0],value:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:43.614 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:43.614 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:44.576 INFO 67847 --- [ead-nonblock-03] c.h.s.r.s.c.x.XreadNonBlockConsumer03 : Independentconsumption receive data id:[1655881842812-0] book:[Book(title=The Monkey's Raincoat, author=Micki Wisozk)]
2022-06-22 15:10:44.623 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:44.623 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:45.640 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:45.647 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:46.654 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:46.655 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:46.655 WARN 67847 --- [ead-nonblock-03] c.h.s.r.s.c.x.XreadNonBlockConsumer03 : no data
2022-06-22 15:10:47.517 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : generate book info:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:47.519 INFO 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : Independentconsumption receive data id:[1655881847765-0] book:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:47.519 INFO 67847 --- [ream-consumer-4] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [group b] group:[group-b] consumerName:[consumer-bb] receive data stream:[stream-001],id:[1655881847765-0],value:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:47.519 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : return record-id:[1655881847765-0]
2022-06-22 15:10:47.519 INFO 67847 --- [ream-consumer-1] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [Independentconsumption]: receive data stream:[stream-001],id:[1655881847765-0],value:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:47.521 INFO 67847 --- [ream-consumer-3] c.h.s.r.s.c.g.AsyncConsumeStreamListener : [group a] group:[group-a] consumerName:[consumer-b] receive data stream:[stream-001],id:[1655881847765-0],value:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:47.521 INFO 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : Independentconsumption receive data id:[1655881847765-0] book:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:48.582 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:48.582 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:49.524 INFO 67847 --- [ead-nonblock-03] c.h.s.r.s.c.x.XreadNonBlockConsumer03 : Independentconsumption receive data id:[1655881847765-0] book:[Book(title=Rosemary Sutcliff, author=Elenora Christiansen)]
2022-06-22 15:10:49.600 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:49.600 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:50.616 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:50.620 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:51.626 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:51.626 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:51.626 WARN 67847 --- [ead-nonblock-03] c.h.s.r.s.c.x.XreadNonBlockConsumer03 : no data
2022-06-22 15:10:52.505 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : generate book info:[Book(title=All Passion Spent, author=Mr. Ludivina Rogahn)]
2022-06-22 15:10:52.508 INFO 67847 --- [pool-1-thread-1] c.h.s.r.stream.producer.StreamProducer : return record-id:[1655881852755-0]
2022-06-22 15:10:52.509 INFO 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : Independentconsumption receive data id:[1655881852755-0] book:[Book(title=All Passion Spent, author=Mr. Ludivina Rogahn)]
2022-06-22 15:10:52.509 INFO 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : Independentconsumption receive data id:[1655881852755-0] book:[Book(title=All Passion Spent, author=Mr. Ludivina Rogahn)]
2022-06-22 15:10:53.554 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:53.554 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:54.512 INFO 67847 --- [ead-nonblock-03] c.h.s.r.s.c.x.XreadNonBlockConsumer03 : Independentconsumption receive data id:[1655881852755-0] book:[Book(title=All Passion Spent, author=Mr. Ludivina Rogahn)]
2022-06-22 15:10:54.566 WARN 67847 --- [ead-nonblock-02] c.h.s.r.s.c.x.XreadNonBlockConsumer02 : no data
2022-06-22 15:10:54.566 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
2022-06-22 15:10:55.577 WARN 67847 --- [ead-nonblock-01] c.h.s.r.s.c.x.XreadNonBlockConsumer01 : no data
Disconnected from the target VM, address: '127.0.0.1:52421', transport: 'socket'
The log output above may be different from your local log, because the article you see was translated by someone else
Both consumer groups and independent consumers can consume
destroyMethod = "stop"
on the StreamMessageListenerContainer configuration
CycleGeneratorStreamMessageRunner
classFirst: In Redis
del stream-001
Second: create group In RedisXGROUP CREATE stream-001 group-a $ mkstream
create group In RedisXGROUP CREATE stream-001 group-b $ mkstream
Third: checkStreamMessageListenerContainer
@Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() { }
destroyMethod = "stop"
needs to be configuredFour: Check whether the serialization method is consistent in various places
Wow, I dont know how to figure it out when I reproduce first three steps from you, it's work without error in running the application! Thanks!
But when I try to reproduce this:
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
- I don't have this exception locally. Check if there is
destroyMethod = "stop"
on theStreamMessageListenerContainer configuration
- When the program is closed, need close thread pool on the
CycleGeneratorStreamMessageRunner
class
Here is my log:
2022-06-22 19:19:40.176 INFO 488351 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9090 (http) with context path ''
2022-06-22 19:19:40.184 INFO 488351 --- [ main] n.t.c.ClassroomServiceApplication : Started ClassroomServiceApplication in 2.538 seconds (JVM running for 2.757)
2022-06-22 19:19:40.186 INFO 488351 --- [ main] n.t.c.CycleGeneratorStreamMessageRunner : stream-001
2022-06-22 19:19:40.187 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 19:19:40.214 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655900380213-0]
2022-06-22 19:19:40.223 INFO 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900380213-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 19:19:40.226 INFO 488351 --- [ream-consumer-3] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group A] group : [group-a], consumerName :[consumer-b] received message stream: [stream-001] ID: [1655900380213-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 19:19:40.226 INFO 488351 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900380213-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 19:19:40.226 INFO 488351 --- [ream-consumer-4] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group B] group : [group-b], consumerName :[consumer-a] received message stream: [stream-001] ID: [1655900380213-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80)])]
2022-06-22 19:19:41.297 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:42.402 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:43.506 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:44.510 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:45.186 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:45.191 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655900385190-0]
2022-06-22 19:19:45.195 INFO 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900385190-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:45.195 INFO 488351 --- [ream-consumer-3] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group A] group : [group-a], consumerName :[consumer-b] received message stream: [stream-001] ID: [1655900385190-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:45.196 INFO 488351 --- [ream-consumer-4] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group B] group : [group-b], consumerName :[consumer-a] received message stream: [stream-001] ID: [1655900385190-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:45.196 INFO 488351 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900385190-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:46.219 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:47.229 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:48.330 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:49.335 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:50.186 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:50.190 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655900390189-0]
2022-06-22 19:19:50.192 INFO 488351 --- [ream-consumer-3] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group A] group : [group-a], consumerName :[consumer-b] received message stream: [stream-001] ID: [1655900390189-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:50.192 INFO 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900390189-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:50.192 INFO 488351 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900390189-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:50.192 INFO 488351 --- [ream-consumer-4] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group B] group : [group-b], consumerName :[consumer-a] received message stream: [stream-001] ID: [1655900390189-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:51.243 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:52.248 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:53.253 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:19:54.259 WARN 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
^C2022-06-22 19:19:55.186 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Information generating a classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:55.188 INFO 488351 --- [pool-1-thread-1] n.t.c.producer.StreamProducer : Returned record ID: [1655900395187-0]
2022-06-22 19:19:55.189 INFO 488351 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900395187-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:55.189 INFO 488351 --- [ream-consumer-3] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group A] group : [group-a], consumerName :[consumer-b] received message stream: [stream-001] ID: [1655900395187-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:55.189 INFO 488351 --- [ream-consumer-4] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group B] group : [group-b], consumerName :[consumer-a] received message stream: [stream-001] ID: [1655900395187-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:55.189 INFO 488351 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900395187-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:19:55.191 WARN 488351 --- [ream-consumer-2] io.lettuce.core.RedisChannelHandler : Connection is already closed
2022-06-22 19:19:55.195 INFO 488351 --- [ream-consumer-2] n.t.c.dto.CustomErrorHandler : Error exception occured:
org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:350) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$3(DefaultStreamMessageListenerContainer.java:259) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$4(DefaultStreamMessageListenerContainer.java:258) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3]
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.8.RELEASE.jar:6.1.8.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.75.Final.jar:4.1.75.Final]
... 1 common frames omitted
2022-06-22 19:19:55.299 INFO 488351 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 19:19:55.302 INFO 488351 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 19:19:55.305 INFO 488351 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
- When the program is closed, need close thread pool on the
CycleGeneratorStreamMessageRunner
class
Is it because I try to shut down the application when the cycle is running?
Because, when I try to reproduce to shutdown the application, there is no other errors:
2022-06-22 19:21:31.159 INFO 488833 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900491156-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:21:31.159 INFO 488833 --- [ream-consumer-4] n.t.c.c.AsyncConsumeStreamListener : [Consumption Group B] group : [group-b], consumerName :[consumer-a] received message stream: [stream-001] ID: [1655900491156-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:21:31.159 INFO 488833 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900491156-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:21:31.270 INFO 488833 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 19:21:31.274 INFO 488833 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 19:21:31.277 INFO 488833 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
[INFO] ------------------------------------------------------------------------
2022-06-22 19:23:39.925 INFO 489480 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900619923-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:23:39.925 INFO 489480 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900619923-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
^C2022-06-22 19:23:40.932 WARN 489480 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:23:41.049 INFO 489480 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 19:23:41.052 INFO 489480 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 19:23:41.055 INFO 489480 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
2022-06-22 19:22:47.739 INFO 489217 --- [ream-consumer-1] n.t.c.c.AsyncConsumeStreamListener : [Independent consumption]: received message stream: [stream-001] ID: [1655900567736-0], value: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
2022-06-22 19:22:47.739 INFO 489217 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : Obtained data information ID: [1655900567736-0] classroom: [Classroom(id=1, name=Test, classroomId=1, teacherId=1, students=[Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80), Students(studentId=1, grade=80)])]
^C2022-06-22 19:22:48.791 WARN 489217 --- [ead-nonblock-01] n.t.c.consumer.XReadNonBlockConsumer01 : No data obtained
2022-06-22 19:22:48.905 INFO 489217 --- [ionShutdownHook] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2022-06-22 19:22:48.909 INFO 489217 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2022-06-22 19:22:48.913 INFO 489217 --- [ionShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
In just any case here is my files:
package ngodingkuy.tech.classroomservice.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory connectionFactory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(RedisSerializer.string());
return redisTemplate;
}
}
package ngodingkuy.tech.classroomservice.config;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.hash.ObjectHashMapper;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import ngodingkuy.tech.classroomservice.consumer.AsyncConsumeStreamListener;
import ngodingkuy.tech.classroomservice.dto.Constants;
import ngodingkuy.tech.classroomservice.dto.CustomErrorHandler;
import ngodingkuy.tech.classroomservice.model.Classroom;
@Configuration
public class RedisStreamConfiguration {
@Resource
private RedisConnectionFactory redisConnectionFactory;
@Bean(initMethod="start", destroyMethod="stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Classroom>> streamMessageListenerContainer(){
AtomicInteger index = new AtomicInteger(1);
int processors = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r -> {
Thread thread = new Thread(r);
thread.setName("async-stream-consumer-" + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Classroom>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
.batchSize(10)
.executor(executor)
.keySerializer(RedisSerializer.string())
.hashKeySerializer(RedisSerializer.string())
.hashValueSerializer(RedisSerializer.string())
.pollTimeout(Duration.ofSeconds(1))
.objectMapper(new ObjectHashMapper())
.errorHandler(new CustomErrorHandler())
.targetType(Classroom.class)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, Classroom>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
String streamKey = Constants.STREAM_KEY_001;
streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new AsyncConsumeStreamListener("Independent consumption", null, null)
);
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumption Group A", "group-a", "consumer-a"));
streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumption Group A", "group-a", "consumer-b"));
streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b", "consumer-a"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumption Group B", "group-b", "consumer-a"));
return streamMessageListenerContainer;
}
}
package ngodingkuy.tech.classroomservice;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import ngodingkuy.tech.classroomservice.producer.StreamProducer;
import static ngodingkuy.tech.classroomservice.dto.Constants.STREAM_KEY_001;
@Component
@AllArgsConstructor
@Slf4j
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner{
private final StreamProducer streamProducer;
@Override
public void run(ApplicationArguments args){
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(()->streamProducer
.sendRecord(STREAM_KEY_001),
0,
5,
TimeUnit.SECONDS);
log.info(STREAM_KEY_001);
}
}
package ngodingkuy.tech.classroomservice.consumer;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import ngodingkuy.tech.classroomservice.model.Classroom;
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String, ObjectRecord<String, Classroom>>{
public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
this.consumerType = consumerType;
this.group = group;
this.consumerName = consumerName;
}
private String consumerType;
private String group;
private String consumerName;
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(ObjectRecord<String, Classroom> message) {
String stream = message.getStream();
RecordId id = message.getId();
Classroom value = message.getValue();
if (StringUtils.isBlank(group)) {
log.info("[{}]: received message stream: [{}] ID: [{}], value: [{}]", consumerType, stream, id, value);
}else{
log.info("[{}] group : [{}], consumerName :[{}] received message stream: [{}] ID: [{}], value: [{}]", consumerType, group, consumerName, stream, id, value);
}
//redisTemplate.opsForStream()
//.acknowledge("key", "group", "recordId");
}
}
package ngodingkuy.tech.classroomservice.consumer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import ngodingkuy.tech.classroomservice.dto.Constants;
import ngodingkuy.tech.classroomservice.model.Classroom;
@Component
@Slf4j
public class XReadNonBlockConsumer01 implements InitializingBean, DisposableBean{
private ThreadPoolExecutor threadPoolExecutor;
@Resource
private RedisTemplate<String, Object> redisTemplate;
private volatile boolean stop = false;
@Override
public void destroy() throws Exception {
stop= true;
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS);
}
@Override
public void afterPropertiesSet() throws Exception {
threadPoolExecutor = new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(), r->{
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("xread-nonblock-01");
return thread;
}
);
StreamReadOptions streamReadOptions = StreamReadOptions.empty()
.block(Duration.ofMillis(1000))
.count(10);
StringBuilder readOffset = new StringBuilder("0-0");
threadPoolExecutor.execute(()->{
while(!stop){
List<ObjectRecord<String, Classroom>> objectRecords = redisTemplate.opsForStream()
.read(Classroom.class, streamReadOptions, StreamOffset.create(Constants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
if (CollectionUtils.isEmpty(objectRecords)) {
log.warn("No data obtained");
continue;
}
for (ObjectRecord<String, Classroom> objectRecord: objectRecords) {
log.info("Obtained data information ID: [{}] classroom: [{}]", objectRecord.getId(), objectRecord.getValue());
readOffset.setLength(0);
readOffset.append(objectRecord.getId());
}
}
});
}
}
First: In Redis
del stream-001
Second: create group In RedisXGROUP CREATE stream-001 group-a $ mkstream
create group In RedisXGROUP CREATE stream-001 group-b $ mkstream
Is it because MKSTREAM
?
According to the docs,
By default, the XGROUP CREATE command insists that the target stream exists and returns an error when it doesn't. However, you can use the optional MKSTREAM subcommand as the last argument after the
to automatically create the stream (with length of 0) if it doesn't exist
That is to say, there is only org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
. This exception problem has not been solved.
Based on your exception stack information, I make my guess. 2022-06-22 19:19:55.191 WARN 488351 --- [ream-consumer-2] io.lettuce.core.RedisChannelHandler : Connection is already closed 2022-06-22 19:19:55.195 INFO 488351 --- [ream-consumer-2] n.t.c.dto.CustomErrorHandler : Error exception occured: org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed ....... at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.3.jar:2.6.3] at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.3.jar:2.6.3] at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.3.jar:2.6.3]
2022-06-22 19:19:55.191 WARN 488351 --- [ream-consumer-2] io.lettuce.core.RedisChannelHandler : Connection is already closed
Also need to pay attention, it seems that the Redis connection has been closed many times
First: Check if the org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer#stop()
method is executed.
Second: Check whether the org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#destroy()
method is executed.
In theory, the stop
method is executed first
, followed by the destroy
method.
If you can't solve it, you can provide a mini demo
with the problem, and give the code address of the repository on github
, and tell you how to close the project, use kill pid
or other methods.
First: Check if the
org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer#stop()
method is executed. Second: Check whether theorg.springframework.data.redis.connection.lettuce.LettuceConnectionFactory#destroy()
method is executed.In theory, the
stop
method is executedfirst
, followed by thedestroy
method.
I can not figure it out how to check it.
Here is my example repo for implementation of spring redis data based on ur repo, https://github.com/dhymasriyanto/classroom-spring-data-redis How to reproduce :
./mvnw clean spring-boot:run
Ctrl-C
when the cycle is running. (sometimes it gives an error, sometimes no error occurs)
I came here after read your article on here, when I follow all of your code, I get an error:
And and error from CustomErrorHandler :
But I think it's just happen on first time the application running, because after that the producer, and consumer run normally, except on independent consumer.