redisson / redisson

Redisson - Valkey and Redis Java client. Complete Real-Time Data Platform. Sync/Async/RxJava/Reactive API. Over 50 Valkey and Redis based Java objects and services: Set, Multimap, SortedSet, Map, List, Queue, Deque, Semaphore, Lock, AtomicLong, Map Reduce, Bloom filter, Spring, Tomcat, Scheduler, JCache API, Hibernate, RPC, local cache..
https://redisson.pro
Apache License 2.0
23.35k stars 5.36k forks source link

The doLoop method in the StreamPollTask class always reports a null pointer exception #4135

Closed fendo8888 closed 6 months ago

fendo8888 commented 2 years ago

The version used is spring-data-redis-2.4.14.jar

20220213001405 20220213001322

ERROR o.s.d.r.s.DefaultStreamMessageListenerContainer$LoggingErrorHandler - [handleError,388] - Unexpected error occurred in scheduled task. java.lang.NullPointerException: null at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:171) at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148) at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) at java.lang.Thread.run(Thread.java:748)

mrniko commented 2 years ago

is there any code to reproduce it?

fendo8888 commented 2 years ago

Without the relevant code, this error occurs from time to time. I think someone has encountered it: https://github.com/redisson/redisson/issues/4006

mrniko commented 2 years ago

Can you set trace logging level for org.redisson package and try to reproduce the issue again? I have checked the code and non of codecs return null. They all return empty list in this case.

haidiiii commented 2 years ago

@mrniko trace 打开后,日志如下

2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-13] o.r.client.handler.CommandEncoder        : channel: [id: 0x39607155, L:/10.10.10.105:50431 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-21] o.r.client.handler.CommandEncoder        : channel: [id: 0x146dd9d6, L:/10.10.10.105:50435 - R:10.10.10.3/10.10.10.3:6379] message: *11
$10
XREADGROUP
$5
GROUP
$16
queue_group_test
$22
test-10-10-10-105-8080
$5
COUNT
$2
10
$5
BLOCK
$5
30000
$7
STREAMS
$15
queue:test:pull
$1
>

2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-16] o.r.client.handler.CommandEncoder        : channel: [id: 0x2536f256, L:/10.10.10.105:50432 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-27] o.r.client.handler.CommandEncoder        : channel: [id: 0xf95281a6, L:/10.10.10.105:50438 - R:10.10.10.3/10.10.10.3:6379] message: *11
$10
XREADGROUP
$5
GROUP
$16
queue_group_test
$22
test-10-10-10-105-8080
$5
COUNT
$2
10
$5
BLOCK
$5
30000
$7
STREAMS
$18
queue:test:abandon
$1
>

2022-08-08 13:55:38.911 ERROR 72612 --- [cTaskExecutor-7] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.

java.lang.NullPointerException: null
    at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:171)
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148)
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132)
    at java.lang.Thread.run(Thread.java:748)

2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-19] o.r.client.handler.CommandEncoder        : channel: [id: 0x888e5c5d, L:/10.10.10.105:50434 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:38.912 TRACE 72612 --- [sson-netty-2-25] o.r.client.handler.CommandEncoder        : channel: [id: 0x984111b8, L:/10.10.10.105:50437 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:38.912 TRACE 72612 --- [isson-netty-2-8] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x0c81d6e8, L:/10.10.10.105:50428 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@418363[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:38.912 TRACE 72612 --- [sson-netty-2-29] o.r.client.handler.CommandEncoder        : channel: [id: 0xfe28534d, L:/10.10.10.105:50439 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:38.912 TRACE 72612 --- [sson-netty-2-31] o.r.client.handler.CommandEncoder        : channel: [id: 0x258c26e5, L:/10.10.10.105:50440 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.735 DEBUG 72612 --- [cTaskExecutor-1] org.redisson.command.RedisExecutor       : acquired connection for command (XREADGROUP) and params [GROUP, test_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [97, 112, 112, 116, 58, 98, 117, 115, 105, 110, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 10.10.10.3/10.10.10.3:6379... RedisConnection@26933046 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0xb8624a13, L:/10.10.10.105:50441 - R:10.10.10.3/10.10.10.3:6379], currentCommand=null, usage=1]
2022-08-08 13:55:38.911 TRACE 72612 --- [sson-netty-2-17] o.r.client.handler.CommandDecoder        : reply: +OK
, channel: [id: 0x423d0bde, L:/10.10.10.105:50449 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@b43af7[Not completed, 2 dependents], command=(QUIT), params=[], codec=null]
2022-08-08 13:55:38.912 TRACE 72612 --- [isson-netty-2-9] o.r.client.handler.CommandEncoder        : channel: [id: 0x9554ac9a, L:/10.10.10.105:50445 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING
mrniko commented 2 years ago

@haidiiii Can you share some test code to reproduce it? Or update the code https://github.com/redisson/redisson/issues/4006#issuecomment-986523156 to reproduce it?

mrniko commented 2 years ago

@haidiiii Can you share more logs before 13:55:38.911 moment?

haidiiii commented 2 years ago

@mrniko 13:55:38.911之前的日志是这样的

2022-08-08 13:55:00.990 TRACE 72612 --- [redisson-netty-2-3] o.r.client.handler.CommandEncoder        : channel: [id: 0x38c4234f, L:/10.10.10.105:50426 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:00.990 TRACE 72612 --- [redisson-netty-2-4] o.r.client.handler.CommandEncoder        : channel: [id: 0xd8289fe0, L:/10.10.10.105:50427 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:00.990 TRACE 72612 --- [redisson-netty-2-2] o.r.client.handler.CommandEncoder        : channel: [id: 0xfc24292c, L:/10.10.10.105:50425 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:00.991 TRACE 72612 --- [redisson-netty-2-8] o.r.client.handler.CommandEncoder        : channel: [id: 0x0c81d6e8, L:/10.10.10.105:50428 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:00.991 TRACE 72612 --- [redisson-netty-2-9] o.r.client.handler.CommandEncoder        : channel: [id: 0x6a02386c, L:/10.10.10.105:50429 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:00.991 TRACE 72612 --- [redisson-netty-2-3] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x38c4234f, L:/10.10.10.105:50426 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@87dc09[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:00.991 TRACE 72612 --- [redisson-netty-2-4] o.r.client.handler.CommandPubSubDecoder  : reply: *2
$4
pong
$0

, channel: [id: 0xd8289fe0, L:/10.10.10.105:50427 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@26566a[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:00.992 TRACE 72612 --- [redisson-netty-2-2] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0xfc24292c, L:/10.10.10.105:50425 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@89cc8f[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:00.992 TRACE 72612 --- [redisson-netty-2-8] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x0c81d6e8, L:/10.10.10.105:50428 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@45c3af[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:00.992 TRACE 72612 --- [redisson-netty-2-9] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x6a02386c, L:/10.10.10.105:50429 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@13e31bb[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.088 TRACE 72612 --- [redisson-netty-2-21] o.r.client.handler.CommandEncoder        : channel: [id: 0x146dd9d6, L:/10.10.10.105:50435 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.088 TRACE 72612 --- [redisson-netty-2-25] o.r.client.handler.CommandEncoder        : channel: [id: 0x984111b8, L:/10.10.10.105:50437 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.088 TRACE 72612 --- [redisson-netty-2-27] o.r.client.handler.CommandEncoder        : channel: [id: 0xf95281a6, L:/10.10.10.105:50438 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.088 TRACE 72612 --- [redisson-netty-2-29] o.r.client.handler.CommandEncoder        : channel: [id: 0xfe28534d, L:/10.10.10.105:50439 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.088 TRACE 72612 --- [redisson-netty-2-31] o.r.client.handler.CommandEncoder        : channel: [id: 0x258c26e5, L:/10.10.10.105:50440 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-3] o.r.client.handler.CommandEncoder        : channel: [id: 0x922b0aef, L:/10.10.10.105:50442 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-32] o.r.client.handler.CommandEncoder        : channel: [id: 0xb8624a13, L:/10.10.10.105:50441 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-7] o.r.client.handler.CommandEncoder        : channel: [id: 0x4c88fb6a, L:/10.10.10.105:50444 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-21] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x146dd9d6, L:/10.10.10.105:50435 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@1b6e228[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-25] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x984111b8, L:/10.10.10.105:50437 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@e745dd[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-9] o.r.client.handler.CommandEncoder        : channel: [id: 0x9554ac9a, L:/10.10.10.105:50445 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-27] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0xf95281a6, L:/10.10.10.105:50438 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@57973[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-31] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x258c26e5, L:/10.10.10.105:50440 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@5294e5[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-11] o.r.client.handler.CommandEncoder        : channel: [id: 0xa2aa9511, L:/10.10.10.105:50446 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.089 TRACE 72612 --- [redisson-netty-2-29] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0xfe28534d, L:/10.10.10.105:50439 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@fa7592[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.090 TRACE 72612 --- [redisson-netty-2-13] o.r.client.handler.CommandEncoder        : channel: [id: 0x85d26820, L:/10.10.10.105:50447 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:01.090 TRACE 72612 --- [redisson-netty-2-3] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x922b0aef, L:/10.10.10.105:50442 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@fc9938[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.090 TRACE 72612 --- [redisson-netty-2-32] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0xb8624a13, L:/10.10.10.105:50441 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@7b7fa2[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.090 TRACE 72612 --- [redisson-netty-2-7] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x4c88fb6a, L:/10.10.10.105:50444 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@a41424[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.090 TRACE 72612 --- [redisson-netty-2-11] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0xa2aa9511, L:/10.10.10.105:50446 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@d8719e[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.091 TRACE 72612 --- [redisson-netty-2-9] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x9554ac9a, L:/10.10.10.105:50445 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@1fcbe61[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:01.091 TRACE 72612 --- [redisson-netty-2-13] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x85d26820, L:/10.10.10.105:50447 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@b4f0b8[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:14.192 TRACE 72612 --- [redisson-netty-2-21] o.r.client.handler.CommandEncoder        : channel: [id: 0x7cb0ef73, L:/10.10.10.105:62006 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:14.194 TRACE 72612 --- [redisson-netty-2-21] o.r.client.handler.CommandDecoder        : reply: +PONG
, channel: [id: 0x7cb0ef73, L:/10.10.10.105:62006 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@fcb1f9[Not completed, 1 dependents], command=(PING), params=[], codec=org.redisson.client.codec.StringCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-17] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x25b878ea, L:/10.10.10.105:50433 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@10bfc3f[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-16] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x2536f256, L:/10.10.10.105:50432 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@8fd538[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, test_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [97, 112, 112, 116, 58, 98, 117, 115, 105, 110, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-11] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x8c5cb5bd, L:/10.10.10.105:50430 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@1d1cf93[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-19] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x888e5c5d, L:/10.10.10.105:50434 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@149e0e3[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-13] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x39607155, L:/10.10.10.105:50431 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@4a2391[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.008 TRACE 72612 --- [redisson-netty-2-15] o.r.client.handler.CommandDecoder        : reply: *-1
, channel: [id: 0x15e398cf, L:/10.10.10.105:50448 - R:10.10.10.3/10.10.10.3:6379], command: CommandData [promise=java.util.concurrent.CompletableFuture@1edb726[Not completed, 1 dependents], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec]
2022-08-08 13:55:28.009 DEBUG 72612 --- [redisson-netty-2-17] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@16966716 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x25b878ea, L:/10.10.10.105:50433 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@10bfc3f[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:28.009 DEBUG 72612 --- [redisson-netty-2-13] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@10549807 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x39607155, L:/10.10.10.105:50431 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@4a2391[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:28.009 DEBUG 72612 --- [redisson-netty-2-15] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@16782336 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x15e398cf, L:/10.10.10.105:50448 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@1edb726[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:28.009 DEBUG 72612 --- [redisson-netty-2-19] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@18815980 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x888e5c5d, L:/10.10.10.105:50434 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@149e0e3[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:28.009 DEBUG 72612 --- [redisson-netty-2-11] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@13240211 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x8c5cb5bd, L:/10.10.10.105:50430 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@1d1cf93[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:29.916 DEBUG 72612 --- [redisson-netty-2-16] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, test_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [97, 112, 112, 116, 58, 98, 117, 115, 105, 110, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@33062768 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x2536f256, L:/10.10.10.105:50432 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@8fd538[Completed normally], command=(XREADGROUP), params=[GROUP, test_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [97, 112, 112, 116, 58, 98, 117, 115, 105, 110, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:29.916 DEBUG 72612 --- [SimpleAsyncTaskExecutor-5] org.redisson.command.RedisExecutor       : acquired connection for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 10.10.10.3/10.10.10.3:6379... RedisConnection@9723017 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x6a02386c, L:/10.10.10.105:50429 - R:10.10.10.3/10.10.10.3:6379], currentCommand=null, usage=1]
2022-08-08 13:55:31.400 DEBUG 72612 --- [SimpleAsyncTaskExecutor-3] org.redisson.command.RedisExecutor       : acquired connection for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 10.10.10.3/10.10.10.3:6379... RedisConnection@20143418 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x146dd9d6, L:/10.10.10.105:50435 - R:10.10.10.3/10.10.10.3:6379], currentCommand=null, usage=1]
2022-08-08 13:55:31.401 DEBUG 72612 --- [redisson-timer-4-1] org.redisson.command.RedisExecutor       : connection released for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using connection RedisConnection@26006407 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x423d0bde, L:/10.10.10.105:50449 - R:10.10.10.3/10.10.10.3:6379], currentCommand=CommandData [promise=java.util.concurrent.CompletableFuture@180322a[Completed normally], command=(XREADGROUP), params=[GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >], codec=org.redisson.client.codec.ByteArrayCodec], usage=0]
2022-08-08 13:55:32.062 DEBUG 72612 --- [SimpleAsyncTaskExecutor-4] org.redisson.command.RedisExecutor       : acquired connection for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 10.10.10.3/10.10.10.3:6379... RedisConnection@18753636 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0x4c88fb6a, L:/10.10.10.105:50444 - R:10.10.10.3/10.10.10.3:6379], currentCommand=null, usage=1]
2022-08-08 13:55:32.731 TRACE 72612 --- [redisson-netty-2-4] o.r.client.handler.CommandEncoder        : channel: [id: 0xd8289fe0, L:/10.10.10.105:50427 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.731 DEBUG 72612 --- [SimpleAsyncTaskExecutor-6] org.redisson.command.RedisExecutor       : acquired connection for command (XREADGROUP) and params [GROUP, queue_group_test, test-10-10-67-185-8086, COUNT, 10, BLOCK, 30000, STREAMS, [113, 117, 101, 117, 101, 58, 105, 116, 101, 109, ...], >] from slot NodeSource [slot=0, addr=null, redisClient=null, redirect=null, entry=null] using node 10.10.10.3/10.10.10.3:6379... RedisConnection@4600923 [redisClient=[addr=redis://10.10.10.3:6379], channel=[id: 0xf95281a6, L:/10.10.10.105:50438 - R:10.10.10.3/10.10.10.3:6379], currentCommand=null, usage=1]
2022-08-08 13:55:32.731 TRACE 72612 --- [redisson-netty-2-3] o.r.client.handler.CommandEncoder        : channel: [id: 0x38c4234f, L:/10.10.10.105:50426 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.732 TRACE 72612 --- [redisson-netty-2-7] o.r.client.handler.CommandEncoder        : channel: [id: 0x4c88fb6a, L:/10.10.10.105:50444 - R:10.10.10.3/10.10.10.3:6379] message: *11
$10
XREADGROUP
$5
GROUP
$16
queue_group_test
$22
test-10-10-67-185-8086
$5
COUNT
$2
10
$5
BLOCK
$5
30000
$7
STREAMS
$15
queue:test:done
$1
>

2022-08-08 13:55:32.732 TRACE 72612 --- [redisson-netty-2-8] o.r.client.handler.CommandEncoder        : channel: [id: 0x0c81d6e8, L:/10.10.10.105:50428 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.731 TRACE 72612 --- [redisson-netty-2-17] o.r.client.handler.CommandEncoder        : channel: [id: 0x423d0bde, L:/10.10.10.105:50449 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
QUIT

2022-08-08 13:55:32.733 TRACE 72612 --- [redisson-netty-2-2] o.r.client.handler.CommandEncoder        : channel: [id: 0xfc24292c, L:/10.10.10.105:50425 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.733 TRACE 72612 --- [redisson-netty-2-11] o.r.client.handler.CommandEncoder        : channel: [id: 0x8c5cb5bd, L:/10.10.10.105:50430 - R:10.10.10.3/10.10.10.3:6379] message: *1
$4
PING

2022-08-08 13:55:32.735 TRACE 72612 --- [redisson-netty-2-9] o.r.client.handler.CommandEncoder        : channel: [id: 0x6a02386c, L:/10.10.10.105:50429 - R:10.10.10.3/10.10.10.3:6379] message: *11
$10
XREADGROUP
$5
GROUP
$16
queue_group_test
$22
test-10-10-67-185-8086
$5
COUNT
$2
10
$5
BLOCK
$5
30000
$7
STREAMS
$15
queue:test:pass
$1
>
haidiiii commented 2 years ago

@mrniko 复现代码比较简单 https://github.com/haidiiii/redisson-stream 可能不能立即复现,需要运行一段时间才能复现

mrniko commented 2 years ago

Here is the output:

recordId: 1667806746522-0
receive: 1667806746522-0,{time=1667806746517-test}
recordId: 1667806751437-0
receive: 1667806751437-0,{time=1667806751436-test}
recordId: 1667806752269-0
receive: 1667806752269-0,{time=1667806752268-test}
recordId: 1667806756997-0
receive: 1667806756997-0,{time=1667806756996-test}
recordId: 1667806757036-0
receive: 1667806757036-0,{time=1667806757036-test}
recordId: 1667806759233-0
receive: 1667806759233-0,{time=1667806759232-test}
recordId: 1667806763141-0
receive: 1667806763141-0,{time=1667806763141-test}
recordId: 1667806767128-0
receive: 1667806767128-0,{time=1667806767127-test}
recordId: 1667806771793-0
receive: 1667806771793-0,{time=1667806771791-test}
recordId: 1667806771939-0
receive: 1667806771939-0,{time=1667806771938-test}
recordId: 1667806776506-0
receive: 1667806776506-0,{time=1667806776505-test}
recordId: 1667806777374-0
receive: 1667806777374-0,{time=1667806777373-test}
recordId: 1667806778421-0
receive: 1667806778421-0,{time=1667806778420-test}
recordId: 1667806779990-0
receive: 1667806779990-0,{time=1667806779989-test}
recordId: 1667806780459-0
receive: 1667806780459-0,{time=1667806780458-test}
recordId: 1667806783907-0
receive: 1667806783907-0,{time=1667806783906-test}
recordId: 1667806787546-0
receive: 1667806787546-0,{time=1667806787546-test}
recordId: 1667806791448-0
receive: 1667806791448-0,{time=1667806791447-test}
recordId: 1667806792842-0
receive: 1667806792842-0,{time=1667806792841-test}
recordId: 1667806795263-0
receive: 1667806795263-0,{time=1667806795263-test}
recordId: 1667806795358-0
receive: 1667806795358-0,{time=1667806795358-test}
recordId: 1667806798725-0
receive: 1667806798725-0,{time=1667806798725-test}
recordId: 1667806799362-0
receive: 1667806799362-0,{time=1667806799361-test}
recordId: 1667806800072-0
receive: 1667806800072-0,{time=1667806800071-test}
survivant commented 1 year ago

I have the same issue but with lettuce. Here the code that I have.

logs

07:52:55.111 [SimpleAsyncTaskExecutor-1] ERROR o.s.d.r.s.DefaultStreamMessageListenerContainer$LoggingErrorHandler - Unexpected error occurred in scheduled task.
java.lang.NullPointerException: null
    at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:177)
    at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148)
    at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132)
    at java.base/java.lang.Thread.run(Thread.java:829)
07:52:55.109 [SimpleAsyncTaskExecutor-2] INFO  c.e.demoredis.controller.StreamTest - Message received in JUNIT for group-consumer [group-a-consumer-a] [DemoMessage(id=0, message=message1, createdBy=user1, modifyBy=null)]
07:52:55.117 [pool-2-thread-2] INFO  c.e.demoredis.controller.StreamTest - Messages received
 @Test
    public void testStreamListener() {
        publishDemoMessage(1);

        var future = CompletableFuture.supplyAsync(() -> waitNotification(countDownLatch), executor)
                .thenAcceptAsync(release -> LOGGER.info("Messages received"), executor);

        // wait for the future complete
        waitForFutureToComplete(future);

        assertEquals(0, countDownLatch.getCount());
}
private void publishDemoMessage(int count) {
        for (var i = 0; i<count; i++) {
            var message = DemoMessage.builder()
                    .id(String.valueOf(movieID++))
                    .message("message1")
                    .createdBy("user1")
                    .build();

            var record = StreamRecords.newRecord()
                    .ofObject(message)
                    .withStreamKey(KEY);

            var recordId = redisTemplate.opsForStream().add(record).block();
        }
    }
@Configuration
@RequiredArgsConstructor
public class StreamRedisConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamRedisConfiguration.class);

    public static String STREAM_CHANNEL;
    public static String STREAM_GROUP;
    public static String STREAM_CONSUMER;

    private StreamListener<String, ObjectRecord<String, DemoMessage>> streamListener;

    @Value("${spring.redis.stream.channel}")
    private void setStreamChannel(String channel){
        STREAM_CHANNEL = channel;
    }

    @Value("${spring.redis.stream.group}")
    private void setStreamGroup(String group){
        STREAM_GROUP = group;
    }

    @Value("${spring.redis.stream.consumer}")
    private void setChannelConsumer(String consumer){
        STREAM_CONSUMER = consumer;
    }

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, DemoMessage>> listenerContainer(RedisConnectionFactory redisConnectionFactory) {
        var options = StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(DemoMessage.class)
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        return listenerContainer;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, ReactiveRedisTemplate<String, DemoMessage> reactiveRedisTemplate) {

        try {
            redisConnectionFactory.getConnection()
                    .xGroupCreate(STREAM_CHANNEL.getBytes(), STREAM_GROUP, ReadOffset.from("0-0"), true);
        } catch (RedisSystemException exception) {
            LOGGER.warn(exception.getCause().getMessage());
        }

        var listenerContainer = listenerContainer(redisConnectionFactory);

        var subscription = listenerContainer
                .receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                StreamOffset.create(STREAM_CHANNEL, ReadOffset.lastConsumed())
                , streamListener);
        listenerContainer.start();
        return subscription;
    }
}
@Service
@RequiredArgsConstructor
public class RedisStreamMessageConsumer implements StreamListener<String, ObjectRecord<String, DemoMessage>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamMessageConsumer.class);

    private final ReactiveRedisTemplate<String, String> redisTemplate;

    @Override
    public void onMessage(ObjectRecord<String, DemoMessage> record) {
        LOGGER.info("Stream Subscriber >> [{}]", record.getValue());
    }

}

my pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.comact.demo</groupId>
        <artifactId>redis-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo-redis-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-watcher</name>
    <description>service-watcher</description>
    <properties>
        <java.version>11</java.version>
        <springdoc.version>1.6.12</springdoc.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>demo-redis-pojo</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-webflux-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.1.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.playtika.testcontainers/embedded-redis -->
        <dependency>
            <groupId>com.playtika.testcontainers</groupId>
            <artifactId>embedded-redis</artifactId>
            <version>2.2.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springdoc</groupId>
                <artifactId>springdoc-openapi-maven-plugin</artifactId>
                <version>1.4</version>
            </plugin>
        </plugins>
    </build>

</project>

and my config

spring:
  redis:
    host: ${embedded.redis.host}
    port: ${embedded.redis.port}
    password: ${embedded.redis.password}
    pubSub:
      channel: 'JUNIT-MESSAGES_CHANNEL'
    stream:
      channel: 'JUNIT-STREAM_CHANNEL'
      group: 'group'
      consumer: 'consumer'
survivant commented 1 year ago

I don't know why Spring received a record when there were not listener added.

image

so I think you can close this bug here and forward it to Spring team.

mrniko commented 6 months ago

Fixed in https://github.com/redisson/redisson/issues/4006