场景描述
简单模拟一个高并发库存扣减场景,商品库存加载到Redis缓存,如:
127.0.0.1:6379> set product:stock:101 200
无锁状态操作
从缓存中获取对应商品的库存,让后进行库存扣减,完成后修改商品缓存中的库存值,操作如下:
@RequestMapping("deduct/stock/{productId}")
public String deductStock(@PathVariable("productId") long productId) {
String lockKey = PRODUCT_LOCK_PREFIX + productId;
int productStock = Integer.parseInt(stringRedisTemplate.opsForValue().get(PRODUCT_STOCK_PREFIX + productId));
if (productStock > 0) {
productStock = productStock - 1;
stringRedisTemplate.opsForValue().set(PRODUCT_STOCK_PREFIX + productId, productStock + "");
System.out.println("扣减成功,剩余库存:" + productStock);
}
return "end";
}
在高并发场景下,上述代码会存在商品超卖的情况,多个线程进来,当一个线程进行库存扣减还没刷新到缓存中,另一个线程从缓存中获取到该商品的库存值进行扣减操作,这样就出现了商品超卖。
SETNX 、SETEX 创建分布式锁
通过Redis SETNX获取分布式锁,SETEX设置锁过期时间,具体操作如下:
@RequestMapping("deduct/stock/{productId}")
public String deductStock(@PathVariable("productId") long productId) {
String lockKey = PRODUCT_LOCK_PREFIX + productId;
/**
* 每个请求进来生成一个唯一的Id,在锁释放操作进行判断处理改锁是不是改请求的锁,避免释放其他请求操作的锁
*/
String clientId = UUID.randomUUID().toString();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId);
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
if (!success) {
return "errorCode";
}
try {
int productStock = Integer.parseInt(stringRedisTemplate.opsForValue().get(PRODUCT_STOCK_PREFIX + productId));
if (productStock > 0) {
productStock = productStock - 1;
stringRedisTemplate.opsForValue().set(PRODUCT_STOCK_PREFIX + productId, productStock + "");
System.out.println("扣减成功,剩余库存:" + productStock);
}
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
1、每个线程进入库存扣减方法,设置一个线程标识ID,在释放该线程锁的时候判断是否是该线程的锁,避免释放其他线程的锁,防止在高并发情况下出现超卖情况。
2、这种方式创建分布式锁,可能会出现死锁的情况,SETNX、SETEX两个命令在Redis服务端操作并非原子操作,当SETNX获取锁成功,执行SETEX不成功的情况,就出现了死锁状态。
SETNX EXP 方式创建分布式锁
SETNX、SETEX两个命令合并一个进行原子操作获取分布式锁,如下:
@RequestMapping("deduct/stock/{productId}")
public String deductStock(@PathVariable("productId") long productId) {
String lockKey = PRODUCT_LOCK_PREFIX + productId;
/**
* 每个请求进来生成一个唯一的Id,在锁释放操作进行判断处理改锁是不是改请求的锁,避免释放其他请求操作的锁
*/
String clientId = UUID.randomUUID().toString();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if (!success) {
return "errorCode";
}
try {
int productStock = Integer.parseInt(stringRedisTemplate.opsForValue().get(PRODUCT_STOCK_PREFIX + productId));
if (productStock > 0) {
productStock = productStock - 1;
stringRedisTemplate.opsForValue().set(PRODUCT_STOCK_PREFIX + productId, productStock + "");
System.out.println("扣减成功,剩余库存:" + productStock);
}
} finally {
if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
}
return "end";
}
这种方式在高并发场景下,释放锁的过程中,如果因为网络情况出现卡顿,这时正好锁的生命周期时间到了,自动释放锁,其他线程获取锁进行库存扣减操作,网络情况恢复后,上一个线程执行释放锁操作,此时会把当前线程的锁给释放掉,这时候其他的线程可以立马获取锁,就会出现库存超卖的情况,解决方案:锁续命,Redisson已帮忙实现了。
Redisson实现分布式锁
引入Redisson依赖,使用Redisson进行分布式锁创建操作。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
@RequestMapping("deduct/stock/{productId}")
public String deductStock(@PathVariable("productId") long productId) {
String lockKey = PRODUCT_LOCK_PREFIX + productId;
RLock redissonLock = redisson.getLock(lockKey);
redissonLock.lock();
try {
int productStock = Integer.parseInt(stringRedisTemplate.opsForValue().get(PRODUCT_STOCK_PREFIX + productId));
if (productStock > 0) {
productStock = productStock - 1;
stringRedisTemplate.opsForValue().set(PRODUCT_STOCK_PREFIX + productId, productStock + "");
System.out.println("扣减成功,剩余库存:" + productStock);
}
} finally {
//释放锁
redissonLock.unlock();
}
return "end";
}
Redisson通过Lua脚本来实现分布式锁、锁续命、锁释放操作,详情可查看Redisson源代码,实现原理如下图: