Redis实践之优惠劵秒杀 1.项目背景 老样子,我们还是要先理解优惠劵相关的表结构,总共有三张涉及的表,分别是优惠劵表、秒杀优惠劵表、优惠劵订单表
这三张表就是我们本次秒杀场景下经常使用的表,为了更好理解,下面是三张表的关系图:
2.全局唯一ID 为什么订单表不使用数据库自带的主键自增功能呢,因为当用户抢购时,就会生成订单并保存到tb_voucher_order
这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class RedisIdWorker { private final StringRedisTemplate stringRedisTemplate; public RedisIdWorker (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } private static final long START = LocalDateTime.of(2023 , 1 , 1 , 0 , 0 ).toEpochSecond(ZoneOffset.UTC); private static final int SERIAL_BITS = 32 ; public long getNextId (String keyPrefix) { long timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) - START; String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long serialNumber = stringRedisTemplate.opsForValue().increment(keyPrefix + date); return timestamp << SERIAL_BITS | serialNumber; } }
根据全局ID的结构可知,全局ID是递增
的,因为时间戳部分是递增的,而时间戳部分相同时序列号部分也是递增的,序列号的递增是通过Redis值自增实现的,其中键采用yyyy:MM:dd
的方式(每天重置序列号),因为如果Redis键可能随着时间积累序列号自增溢出,而使用上述日期格式的键既解决了序列号的溢出问题,同时还方便统计数据的生成,如统一2023年7月份的订单量。
当然依赖于Redis进行统计数据生成还是不可靠,因为订单全局ID生成后订单不一定能成功,可能发生订单创建失败、用户退单等各种情况。
3.优惠劵秒杀下单
4.超卖问题
我们采用版本号的乐观锁方法解决超卖问题,当然我们没有加额外的字段version
,因为我们发现订单量充足的时候,如果大量用户请求下单,同一时刻只有一个用户能够下单成功,其他用户只能重试或失败,因此针对这种情况,我们使用stock
库存字段自身发挥版本号的作用,类似于set stock = stock - 1 where id = 10 and stock > 0
,这样既解决了超卖问题
,还缓解了乐观锁在高并发场景下由于频繁失败导致性能较差的问题
,可谓是一举两得啊。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public Result seckillVoucher (Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("抢购活动尚未开始!" ); } if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("抢购活动已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } boolean updated = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!updated) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long id = redisIdWorker.getNextId("inc:order:" ); voucherOrder.setId(id); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(id); }
5.一人一单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public Result seckillVoucher (Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("抢购活动尚未开始!" ); } if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("抢购活动已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } IVoucherOrderService service = (IVoucherOrderService) AopContext.currentProxy(); synchronized (userId.toString().intern()) { return service.createOrder(voucherId); } } @Transactional public Result createOrder (Long voucherId) { Long userId = UserHolder.getUser().getId(); Integer count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count(); if (count > 0 ) { return Result.fail("用户不能重复下单!" ); } boolean updated = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherId) .gt("stock" , 0 ) .update(); if (!updated) { return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long id = redisIdWorker.getNextId("inc:order:" ); voucherOrder.setId(id); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(id); }
使用synchronized
锁的方法在单机上可以完成一人一单功能,因为单机JVM的字符串常量池就一份,锁的对象监视器是同一个,不会发生并发安全问题,但是我们的应用后续可能会发展为应用集群即多个Tomcat集群,此时JVM也有多个,我们要使用其他的分布式锁的方案了。
6.分布式锁
分布式锁就是满足分布式系统或集群模式下多进程可见并且互斥的锁。分布式锁的核心是实现多进程之间互斥,而满足这一点的方式有很多,常见的有三种:
上面基于Redis实现的分布式锁有个严重的问题,就是锁过期时间的设置,如果太短可能业务还没有结束锁已经过期释放,如果太长一旦发生故障发生死锁,其他请求只能慢慢等待锁过期释放,因此我们在获取锁时会加入线程标识符,防止误删其他请求线程的加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class SimpleRedisLock { private StringRedisTemplate stringRedisTemplate; private String key; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; private static final String ID_PREFIX = UUID.randomUUID().toString(true ) + "-" ; static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class); } public SimpleRedisLock (StringRedisTemplate stringRedisTemplate, String key) { this .stringRedisTemplate = stringRedisTemplate; this .key = key; } public boolean tryLock (long timeoutSeconds) { String id = ID_PREFIX + Thread.currentThread().getId(); Boolean isLock = stringRedisTemplate.opsForValue().setIfAbsent(key, id, timeoutSeconds, TimeUnit.SECONDS); return BooleanUtil.isTrue(isLock); } public void unlock () { stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(key), ID_PREFIX + Thread.currentThread().getId()); } }
也许你会疑惑为什么释放锁要是有Lua脚本,其实是因为释放锁包括获取锁线程标识符、检查线程标识符是否为自己的、释放锁这三步,如果它们不是原子性的就会出现线程并发安全问题,可能会误删其他请求线程的加锁,具体演示如下图:
Lua脚本的内容:
1 2 3 4 5 if (redis.call('get' ,KEYS[1 ]) == ARGV[1 ]) then return redis.call('del' ,KEYS[1 ]) end return 0
优惠劵秒杀下单的功能使用Redis分布式锁实现一人一单改写优化后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public Result seckillVoucher (Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("抢购活动尚未开始!" ); } if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("抢购活动已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } IVoucherOrderService service = (IVoucherOrderService) AopContext.currentProxy(); Long userId = UserHolder.getUser().getId(); SimpleRedisLock lock = new SimpleRedisLock (stringRedisTemplate, "lock:user" + userId); try { if (lock.tryLock(10 )) { return service.createOrder(voucherId); } return Result.fail("用户不能重复下单!" ); } catch (InterruptedException e) { log.error(e.toString()); return Result.fail("用户不能重复下单!" ); } finally { lock.unlock(); } }
分布式锁常见实现方案总结 | JavaGuide
7.Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
要想使用Redisson,首先需要引入依赖:
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.13.6</version > </dependency >
其次,配置Redisson客户端:
1 2 3 4 5 6 7 8 9 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://127.0.0.1:6379" ).setPassword("123456" ); return Redisson.create(config); } }
最后,使用Redisson的分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public Result seckillVoucher (Long voucherId) { SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId); if (seckillVoucher.getBeginTime().isAfter(LocalDateTime.now())) { return Result.fail("抢购活动尚未开始!" ); } if (seckillVoucher.getEndTime().isBefore(LocalDateTime.now())) { return Result.fail("抢购活动已经结束!" ); } if (seckillVoucher.getStock() < 1 ) { return Result.fail("库存不足!" ); } IVoucherOrderService service = (IVoucherOrderService) AopContext.currentProxy(); Long userId = UserHolder.getUser().getId(); RLock lock = redissonClient.getLock("lock:user" + userId); try { if (lock.tryLock(1 , 10 , TimeUnit.SECONDS)) { return service.createOrder(voucherId); } return Result.fail("用户不能重复下单!" ); } catch (InterruptedException e) { log.error(e.toString()); return Result.fail("用户不能重复下单!" ); } finally { lock.unlock(); } }
7.1解决不可重入 Redisson完美解决了我们上面列出的四个问题,首先看看Redisson如何解决分布式锁不可重入的问题的:
我们跟踪RedissonLock的tryLock方法源代码,绘制下面的调用流程图,这是加锁的整个过程:
leaseTime为-1时表示使用WatchDog的超时时间,并开启超时续期机制; waitTime为-1表示获取锁失败后直接返回,不会重试加锁; RedissionLock加锁步骤:1.判断锁是否已存在,如果不存在则创建锁,设置线程标识符和初始重入次数;2.如果已存在,则检查锁是否是当前线程加上去的,如果是的话则锁重入次数递增并刷新过期时间;3.如果锁已存在,且是其他线程加的锁,则加锁失败返回锁剩余TTL
这个是解锁的整个过程:
RedissionLock解锁步骤:1.判断锁的线程标识符是否是当前线程,如果不是则结束;2.锁的可重入次数递减,判断可重入次数是否为0,如果不为0则刷新锁过期时间;3.如果锁可重入次数为0,表示完全释放,可以删除锁并发布锁释放消息。 注意:重试加锁为了保证性能,没有使用自旋等待的方式,而是利用PubSub信号量机制,其中发布的channelname格式是redisson_lock__channel:{keyName},消息value就是UNLOCK_MESSAGE = 0L
测试Redisson的分布式锁可重入的能力:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Test public void testRedisson () { RLock lock = redissonClient.getLock("test:order:1" ); boolean isLock = lock.tryLock(); if (isLock) { try { System.out.println("处理业务逻辑1" ); method(); } finally { lock.unlock(); } } } @Test public void method () { RLock lock = redissonClient.getLock("test:order:1" ); boolean isLock = lock.tryLock(); if (isLock) { try { System.out.println("处理业务逻辑2" ); } finally { lock.unlock(); } } }
第二次加锁成功,可重入锁:
7.2解决不可重试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 public boolean tryLock (long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } current = System.currentTimeMillis(); RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false )) { subscribeFuture.onComplete((res, e) -> { if (e == null ) { unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(waitTime, unit, threadId); return false ; } try { time -= System.currentTimeMillis() - current; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } while (true ) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(waitTime, leaseTime, unit, threadId); if (ttl == null ) { return true ; } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= System.currentTimeMillis() - currentTime; if (time <= 0 ) { acquireFailed(waitTime, unit, threadId); return false ; } } } finally { unsubscribe(subscribeFuture, threadId); } }
上面的这段就是解决不可重试的核心代码,其中利用到了Redis的发布订阅,还记得之前释放锁流程中谈到的Lua脚本的内容吗:
7.3解决超时释放
WatchDog超时续期可以保证锁不会提前过期,每隔10s(看门狗默认超时时间30/3=10s)就会重新刷新一下锁的过期时间;释放锁的时候会取消锁的超时续期,本质就是通过EXPIRATION_RENEWAL_MAP(静态ConcurrentMap,锁名称->ExpirationEntry的映射)拿到当前锁的ExpirationEntry中的定时任务,将这个取消续期的任务取消即可。
总结Redission分布式锁的原理: 1.可重入:利用锁的Hash结构记录线程标识和重入次数; 2.可重试:利用信号量和PubSub功能实现锁的重试等待、唤醒; 3.超时续约:利用WatchDog,每隔一段时间(releaseTime/3)重置超时时间。
7.4解决主从一致性 由于Redis主从集群存在同步延迟,可能存在主节点加锁成功还没有来得及同步锁数据到从节点,主节点就宕机了,此时从节点被选举成主节点后锁数据就丢失了,因此可以重复加锁。为了解决这个问题,引出了MultiLock联锁的概念,只有每个Redis节点都加锁成功才算成功,否则就失败(失败会把已经获取的锁归还),这样即使某个Redis主节点宕机没来得及同步锁数据,下次请求加联锁时由于其他Redis主节点存在锁数据,因此整个联锁加锁失败。
8.秒杀优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 private BlockingQueue<VoucherOrder> queue = new ArrayBlockingQueue <>(1000 );private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } @PostConstruct public void init () { SECKILL_ORDER_EXECUTOR.submit(() -> { while (true ) { VoucherOrder voucherOrder = null ; try { voucherOrder = queue.take(); } catch (InterruptedException e) { log.error(e.toString()); } boolean updated = seckillVoucherService.update() .setSql("stock = stock - 1" ) .eq("voucher_id" , voucherOrder.getVoucherId()) .gt("stock" , 0 ) .update(); if (!updated) { log.error("库存不足!" ); return ; } save(voucherOrder); } }); } public Result seckillVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString()); if (result.intValue() == 1 ) return Result.fail("库存不足" ); else if (result.intValue() == 2 ) return Result.fail("用户不能重复下单" ); long orderId = redisIdWorker.getNextId("inc:order:" ); VoucherOrder voucherOrder = new VoucherOrder (); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); queue.add(voucherOrder); return Result.ok(orderId); }
这里实现的异步下单比较粗糙,其中的核心逻辑就是MQ异步削峰处理,我们上面使用JDK中的阻塞队列简单模拟,真实情景下肯定不会这么做的,上面的代码存在着订单可能丢失的风险。
Lua脚本seckill.lua
的内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) return 0