c3b2a7 / c3b2a7.github.io

https://lolico.me
4 stars 0 forks source link

Stream消息队列在SpringBoot中的实践与踩坑 | Lolico's Blog #52

Open c3b2a7 opened 4 years ago

c3b2a7 commented 4 years ago

https://lolico.me/2020/06/28/Using-stream-to-implement-message-queue-in-springboot/

前言Redis5新增了一个Stream的数据类型,这个类型作为消息队列来使用时弥补了List和Pub/Sub的不足并且提供了更强大的功能,比如ack机制以及“逻辑”上的分组功能,在有轻量消息队列使用需求时,使用这个新类型那是再好不过了。对于这个类型,在这里就不赘述了,想了解的话可以看一下这篇文章,在这里,我们就具体来讲一下在SpringBoot中的实践与踩坑。注意,SpringBoot版本需要大于

YiuTerran commented 4 years ago

StringRecord,不是StringMapRecord

c3b2a7 commented 4 years ago

@YiuTerran StringRecord,不是StringMapRecord

感谢指出,已修改。

YiuTerran commented 4 years ago

@LOLICOL1

@YiuTerran StringRecord,不是StringMapRecord

感谢指出,已修改。

今天写了一下相关代码,发现最大的坑是add不支持最大长度,然后trim不支持近似值…这没法用啊,内存要炸了

c3b2a7 commented 4 years ago

@YiuTerran

@LOLICOL1

@YiuTerran StringRecord,不是StringMapRecord

感谢指出,已修改。

今天写了一下相关代码,发现最大的坑是add不支持最大长度,然后trim不支持近似值…这没法用啊,内存要炸了

hh,add不支持最大长度是指消息内容的长度吗,这个我还真没去试,至于trim不支持近似值没太理解你说的。redis本身也不太适合做消息队列,目前spring-data-redis也只提供了基础的支持,网上的资料也不多,坑还要自己一个一个踩😂,有专业的消息队列需求那肯定也不用redis了

YiuTerran commented 4 years ago

add那个是指限制整个队列的最大长度。trim有个语法,用~可以近似裁剪,效率比较高。 redis毕竟是内存型消息队列,不限制队列长度内存肯定要炸。

c3b2a7 commented 4 years ago

@YiuTerran add那个是指限制整个队列的最大长度。trim有个语法,用~可以近似裁剪,效率比较高。 redis毕竟是内存型消息队列,不限制队列长度内存肯定要炸。

哦哦,add时限制长度确实没有提供api,但是提供了一个trim方法,需要手动去调用😂

c3b2a7 commented 4 years ago

@YiuTerran add那个是指限制整个队列的最大长度。trim有个语法,用~可以近似裁剪,效率比较高。 redis毕竟是内存型消息队列,不限制队列长度内存肯定要炸。

刚去翻了下api,发现RedisStreamCommands提供了XAddOptions用于设置xadd命令中的MAXLEN,只不过StreamOperations的add没有提供相应接口,可以用execute这种原始的方法来设置😂

c3b2a7 commented 4 years ago

文章中有一处错误一直没修改过来:构造StreamMessageListenerContainerOptionspollTimeout(Duration.ZERO)并非是阻塞式轮询,反而是非阻塞的!默认情况也是非阻塞。原因可查看DefaultStreamMessageListenerContainer#getStreamReadOptions方法,这个值不能超过RedisCommand执行的超时时间,lettuce默认是一分钟,可通过spring.redis.timeout设置。轮询线程在执行过程遇到异常默认直接退出,可以使用register方法注册listener,构造StreamReadRequest时设置cancelSubscriptionOnError来阻止异常时轮询线程的退出,核心代码可查看StreamPollTask#doLoop方法。

yanshenxian-pub commented 4 years ago

如果消费失败,重试如何实现?ReadOffset.lastConsumed() 在应用重启后会获取到 PEL 里面的消息吗?

c3b2a7 commented 4 years ago

@yanshenxian-pub 如果消费失败,重试如何实现?ReadOffset.lastConsumed() 在应用重启后会获取到 PEL 里面的消息吗?

失败重试的话要看失败时具体怎么处理,如果消费时可能出现异常,一般在消费时在外层catch异常,失败后根据业务决定是立即重试还是直接提交ack,原则上,一个消息被读取后就会被放到pending list中,只要没有提交ack,那么这个消息就会一直在pending list中(这里不开启自动ack,那么消费端是可以保证不丢消息的,类似kafka提交offset)。ReadOffset.lastConsumed()是指从最后一次消费的记录开始,所以应用重启后并不会获取pending list中的消息再次消费。如果要处理pending list中的消息,需要另外获取PEL然后进行处理,StreamOperations也提供了相应的方法。

vichbb commented 3 years ago

大佬,您好,请问如果消息数量超过10000是不是就会有可能丢消息了?

3d-fey commented 3 years ago

多个 Stream key 的话,是需要启动多个 StreamMessageListenerContainer 去监听吗?还是有啥好的实践呢?这块感觉资源很少

c3b2a7 commented 3 years ago

@3d-nx 多个 Stream key 的话,是需要启动多个 StreamMessageListenerContainer 去监听吗?还是有啥好的实践呢?这块感觉资源很少

不需要,前提是没有指定targetType,如果指定了targetType,那么监听的消息类型就必须是指定的。由于这一块泛型设计的原因,在指定了targetType后,注册StreamListener时泛型参数就被限制了,实际上只要反序列化不会失败,都是可以的。比较脏一点的解决办法就是指定targetType为Object,然后接受到消息后强制类型转换(大前提是反序列化时不会失败)

111zlj1234 commented 2 years ago

你好,注册监听器时,一定要指定消费者组吗?我注册时没有指定消费者组,结果就监听不到消息了,上面内容也提到非消费者组模式下,可以直接通过read方法进行阻塞或非阻塞消费。所以是StreamMessageListener不支持直接消费吗?