Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令

key: 消息队列的名称

例如:

向队列名s1的队列发送消息,并使用redis自动生成id,发送的消息为:{key1=value1,key2=value2}

XADD s1 * key1 value1 key2 value2

读取消息的方式之一

COUNT 不填写时,默认是读取全部消息

BLOCK 不填写时,默认不阻塞读取

XREAD COUNT 1 STREAMS s1 0

XREAD阻塞方式,读取最新的消息:

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下

注意:当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯

消息被读取后,没有被删除,依旧放在消息队列中,随时可以回溯读取

  • 一个消息可以被多个消费者读取

  • 可以阻塞读取

  • 有消息漏读的风险

只有在使用阻塞读取的时候,才能读取到最新的消息,因为只有在阻塞的时候才会监听。

如果在消费者在不使用阻塞读取的时候,读取完了一条消息,然后我们又往生产者中加入一条消息,这时候我们又在消费者中使用不阻塞读取,这时候是读取不到消息的。

又或者,我们的消费者在阻塞读取的过程中(已经开始读取了,但是没有读取完)

然后我们又往生产者中加入一条消息,这时候这条新加入的消息也是不会被监听到的

文章作者: 落叶知秋
本文链接:
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 落叶知秋
喜欢就支持一下吧