Redis实践之消息队列

Redis实践之消息队列

1.消息队列介绍

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息

Redis提供了三种不同的方式来实现消息队列:

  • List结构:基于List结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

image-20240110185004014

2.基于List结构模拟消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。

队列是入口和出口不在一边,因此我们可以利用:LPUSH结合RPOP、或者RPUSH结合LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。

image-20240110185143572

image-20240110185234614

Redis通常被用作消息服务器来实现后台作业处理或其他类型的消息任务。一个简单的队列通常通过在生产者端将值推入列表中来获得,然后在消费者端使用RPOP(使用轮询)或BRPOP(如果客户端更适合使用阻塞操作)来等待这些值。然而,在这种情况下,获得的队列是不可靠的,因为消息可能会丢失,例如在存在网络问题或者在消息被接收但仍需处理时消费者崩溃。LMOVE(或其阻塞变体BLMOVE)提供了一种避免这个问题的方法:消费者获取消息的同时将其推送到一个处理列表中,一旦消息已经被成功处理,可以使用LREM命令从处理列表中删除消息(相当于消息被ACK)。一个额外的客户端可以监视处理列表中停留时间过长的项目,并在需要时将这些超时的项目再次推送回队列中。

3.基于PubSub的消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

image-20240110185409393

发布消息的channel的格式推荐加上环境前缀如test,staging,production等,消息格式是字符串类型,特殊字符需要下划线\标识

image-20240814205308754

订阅消息后进入PubSub模式,处于等待消息的状态,只能发送有限的命令如PING、SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE、QUIT、RESET等,并且消息语义只支持”最多一次”,存在消息丢失的风险,也不存在消息持久化能力。

image-20240814205402289

image-20240814211005967

image-20240814211423659

PubSub机制的消息队列的优点就是支持多生产、多消费者,但是缺点也很明显,不支持数据持久化、无法避免消息丢失、且消息堆积客户端有上限。

4.基于Stream的消息队列

Stream是Redis 5.0引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

image-20240110185542277

image-20240110185627737

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

image-20240110185732901

当我们指定起始ID为$时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

image-20240110185821020

5.基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

image-20240110185853834

创建消费者组:

1
XGROUP CREATE  key groupName ID [MKSTREAM]
  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建队列

从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认(不建议使用,可能会导致消息丢失)
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

Stream类型消息队列利用XREADGROUP命令可以加快消息的消费速度,减少消息堆积,Stream作为新数据结构也支持数据持久化,同时消费者组内有标识当前消费的进度,因此可以从下一条未消费的消息开始,没有消息漏读的风险,最后消息被消费后并不会立即删除而是转移到消费者组的pendding-list中等待消费者手动ACK,保证消息至少被消费一次。

image-20240110190407870

6.消息队列实现异步秒杀下单

image-20240110190511875

Lua脚本的改写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
-- 秒杀优惠劵ID
local voucherId = ARGV[1]
-- 用户ID
local userId = ARGV[2]
-- 订单ID
local orderId = ARGV[3]
-- 秒杀券库存Key
local stockKey = 'seckill:stock:' .. voucherId
-- 秒杀券订单Key
local orderKey = 'seckill:order:' .. voucherId
-- 判断库存是否充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 库存不足,返回1
return 1
end
-- 判断一人一单
if(redis.call('sismember', orderKey, userId) == 1) then
-- 用户已下单,返回2
return 2
end
-- 扣减库存
redis.call('incrby', stockKey, -1)
-- 保存用户
redis.call('sadd', orderKey, userId)
-- 下单信息保存到消息队列
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId);
-- 秒杀成功,返回0
return 0

异步下单的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
private static final ExecutorService SECKILL_ORDER_HANDLER = Executors.newSingleThreadExecutor();
// 消息队列名称
private String queueName = "stream.orders";

@PostConstruct
public void initial() {
SECKILL_ORDER_HANDLER.submit(() -> {
while (true) {
try {
// 消息队列中取出订单消息,每次读取一条消息,最长等待2秒
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 没有消息,继续循环
continue;
}
// 解析订单消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 扣减库存,使用乐观锁(stock > 0)
boolean updated = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!updated) {
log.error("库存不足!");
return;
}
// 下单
save(voucherOrder);
// ACK确认消息消费完毕
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常", e);
handlePendingList();
}
}
});
}

private void handlePendingList() {
while (true) {
try {
// 消费者pending-list中取出订单消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 判断消息获取是否成功
if (list == null || list.isEmpty()) {
// 没有消息,说明消费者组的pending-list中没有遗漏未处理的订单,结束循环
break;
}
// 解析订单消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 扣减库存,使用乐观锁(stock > 0)
boolean updated = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!updated) {
log.error("库存不足!");
return;
}
// 下单
save(voucherOrder);
// ACK确认消息消费完毕
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理异常", e);
}
}
}

public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.getNextId("inc:order:");
// 执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId));
// 判断秒杀资格
if (result.intValue() == 1) return Result.fail("库存不足");
else if (result.intValue() == 2) return Result.fail("用户不能重复下单");
// 返回订单ID
return Result.ok(orderId);
}