Open TENCHIANG opened 4 years ago
Redis Stream消息队列是Redis5.0的新特性,是一个消息链表,每个节点存的都是消息ID,消息ID类似hash的结构,存的是若干field和string。
Redis Stream
Redis5.0
field
string
Redis自动生成的消息ID格式
const timestamp = Date.now() // 毫秒级时间戳 const count = 0 // 当前时间戳下 第几个 const id = `${timestamp}-${count}`
消息ID也可自定义,规则如下
number-number
向Stream追加消息,其中ID为消息ID的格式,*为自动生成
ID
*
xadd key ID field string [field string ...] xadd stream * name yy age 22
向Stream删除消息,不是真删除,只是设置标志位,不影响Stream的长度
xdel key ID [ID ...] xdel stream 1578969133005-0
显示消息,不包括已删除的消息
xrange key start end [COUNT count] xrange stream - + # 显示从最大到最小(所有)消息 xrange stream 1578969133005-0 1578969140931-0 1 ## 显示它们之间的消息 xrange stream - + count 1 # 从头到位读取1个消息(第一个消息) xrevrange stream + - # 倒序
显示个Stream长度,包括已删除的消息
xlen key
删除整个Stream
del key [key ...]
注意:消费者之间是竞争关系,一旦有一个消费者读取了消息,那么last_delivered_id往前挪一个
last_delivered_id
有三个部分
pending_ids[]
可以在不定义消费组的情况下手动消费消息,甚至可以阻塞等待新的消息
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
从头部读取1条消息
xread count 1 streams key 0-0 # 比 0-0 还要大的消息ID 自然就是从头开始啦 xread count 1 streams key 0 # 上述的简化版本 xread streams key1 key2 0 0 # 同时读取两个stream
从尾部读取1条消息,很显然是(nil)
(nil)
xread count 1 streams key $
稍加修改,阻塞读取最新进来的消息
xread block 0 streams key $ # 一直阻塞 xread block 1000 streams key $ # 单位ms 也就是阻塞1s
挑战Kafka:消息队列Redis Stream
Redis Stream
消息队列是Redis5.0
的新特性,是一个消息链表,每个节点存的都是消息ID,消息ID类似hash的结构,存的是若干field
和string
。消息ID格式
Redis自动生成的消息ID格式
消息ID也可自定义,规则如下
number-number
两个整数命令样例
向Stream追加消息,其中
ID
为消息ID的格式,*
为自动生成向Stream删除消息,不是真删除,只是设置标志位,不影响Stream的长度
显示消息,不包括已删除的消息
显示个Stream长度,包括已删除的消息
删除整个Stream
消费组Consumer Group
注意:消费者之间是竞争关系,一旦有一个消费者读取了消息,那么
last_delivered_id
往前挪一个有三个部分
last_delivered_id
当前读取的消息IDpending_ids[]
等待ACK的消息ID列表(保证消息被消费)消息也可以独立消费
可以在不定义消费组的情况下手动消费消息,甚至可以阻塞等待新的消息
从头部读取1条消息
从尾部读取1条消息,很显然是
(nil)
稍加修改,阻塞读取最新进来的消息