Redis实践之消息队列
1.消息队列介绍
消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
Redis提供了三种不同的方式来实现消息队列:
- List结构:基于List结构模拟消息队列
- PubSub:基本的点对点消息模型
- Stream:比较完善的消息队列模型
2.基于List结构模拟消息队列
消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
Redis通常被用作消息服务器来实现后台作业处理或其他类型的消息任务。一个简单的队列通常通过在生产者端将值推入列表中来获得,然后在消费者端使用RPOP(使用轮询)或BRPOP(如果客户端更适合使用阻塞操作)来等待这些值。然而,在这种情况下,获得的队列是不可靠的,因为
消息可能会丢失
,例如在存在网络问题或者在消息被接收但仍需处理时消费者崩溃。LMOVE(或其阻塞变体BLMOVE)提供了一种避免这个问题的方法:消费者获取消息的同时将其推送到一个处理列表中
,一旦消息已经被成功处理,可以使用LREM
命令从处理列表中删除消息(相当于消息被ACK)。一个额外的客户端可以监视处理列表中停留时间过长的项目
,并在需要时将这些超时的项目再次推送回队列中。
3.基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
发布消息的channel的格式推荐加上环境前缀如test,staging,production等,消息格式是字符串类型,特殊字符需要下划线\标识
订阅消息后进入PubSub模式,处于等待消息的状态,只能发送有限的命令如PING、SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE、QUIT、RESET等,并且消息语义只支持”最多一次”,存在消息丢失的风险,也不存在消息持久化能力。
PubSub机制的消息队列的优点就是支持多生产、多消费者,但是缺点也很明显,不支持数据持久化、无法避免消息丢失、且消息堆积客户端有上限。
4.基于Stream的消息队列
Stream是Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:
当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。
5.基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:
1 | XGROUP CREATE key groupName ID [MKSTREAM] |
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建队列
从消费者组读取消息:
1 | XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] |
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认(不建议使用,可能会导致消息丢失)
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
- “>”:从下一个未消费的消息开始
- 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
Stream类型消息队列利用XREADGROUP命令可以加快消息的消费速度,减少消息堆积,Stream作为新数据结构也支持数据持久化,同时消费者组内有标识当前消费的进度,因此可以从下一条未消费的消息开始,没有消息漏读的风险,最后消息被消费后并不会立即删除而是转移到消费者组的pendding-list中等待消费者手动ACK,保证消息至少被消费一次。
6.消息队列实现异步秒杀下单
Lua脚本的改写:
1 | -- 秒杀优惠劵ID |
异步下单的业务代码:
1 | private static final ExecutorService SECKILL_ORDER_HANDLER = Executors.newSingleThreadExecutor(); |