Redis - 事务与锁
笔记
优秀的 SQL 数据库都拥有事务,而 Redis 作为 NoSQL 数据库的佼佼者,也有「事务」。
2021-12-26 @Young Kbt
# 事务
# 介绍
Redis 事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
Redis 事务的本质是一组命令的集合。事务支持一次执行多个命令,一个事务中所有命令都会被序列化。在事务执行过程,会按照顺序串行化执行队列中的命令,其他客户端提交的命令请求不会插入到事务执行命令序列中。
Redis 事务的主要作用就是 串联多个命令 防止别的命令插队。
- Redis 事务没有隔离级别的概念
- Redis 不保证原子性
- Redis 事务的三个阶段
- 开始事务
- 命令入队
- 执行事务
# 三大特性
单独的隔离操作
事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断
没有隔离级别的概念
队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行
不保证原子性
执行的过程中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚
# 三大指令
开启事务指令
multi
执行事务指定
exec
在执行事务前(exec),结束事务指令(理解为手动回滚)
discard
- 从输入 Multi 命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入完成
- Exec 后,Redis 会将之前的命令队列中的命令依次执行。
- 组队的过程中可以通过 discard 来放弃组队。
# 案例代码
multi
set k1 v1
set k2 v2
get k1
get k2
exec
# 输出 v1、v2
2
3
4
5
6
7
8
9
10
在没有 exec
之前,set 和 get 并没有立即执行,它们仅仅进入了命令队列,等待 exec
命令后再全部执行。
set k1 v1
multi
set k1 v2
discard
get k1
# 输出 v1
2
3
4
5
6
7
8
9
使用了 discard
,代表取消事务,则事务里的 set k1 v2
没有被执行,所以 k1 值依然是 1。
# 错误处理
组队中某个命令出现了报告错误(multi 中),执行时整个的所有队列都会被取消
如果执行阶段(exec)某个命令报出了错误,则只有报错的命令不会被执行,而其他的命令都会执行,不会回滚
# 案例图
放弃事务
若在事务队列中存在命令性错误(类似于 Java 编译性错误),则执行 EXEC 命令时,所有命令都不会执行
若在事务队列中存在语法性错误(类似于 Java 的 1/0 的运行时异常),则执行 EXEC 命令时,其他正确命令会被执行,错误命令抛出异常。
# 事务冲突的问题
想想一个场景:有很多人有你的账户,同时去参加双十一抢购
- 一个请求想给金额减 8000
- 一个请求想给金额减 5000
- 一个请求想给金额减 1000
结果如图:
那么如何解决呢?我们需要利用 Redis 的锁机制。
# 锁
# 悲观锁
悲观锁(Pessimistic Lock),顾名思义,就是很悲观,认为这个世界是黑暗的,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会 block 直到它拿到锁。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。
# 乐观锁
乐观锁(Optimistic Lock),顾名思义,就是很乐观,认为这个世界是光明的,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制。乐观锁适用于多读的应用类型,这样可以提高吞吐量。Redis 就是利用这种 check-and-set
机制实现事务的。
Redis 使用的是乐观锁。
通过指令(可指定多个),开启乐观锁
watch key [key] ...
1一旦 watch 某个 key,则会一直监视这个 key,如果 key 发生了变化,就返回提示。
作用:在执行 multi 之前,先执行
watch key1 [key2]
,可以监视一个(或多个) key ,如果在事务exec
执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。使用场景:很多人同时对一个值进行操作,一旦这个值被修改,且被其他人监听,则其他人无法修改这个值
取消 WATCH 命令对所有 key 的监视
unwatch key [key] ...
1缺点:如果单纯使用 watch,可能导致 key 的值无法完全被修改。
场景:假设库存有 500 个商品,2000 个人进行秒杀购买(2000 个程序监听商品的 key),假设 1999 人同时购买,其内部程序监听的商品数量为 500,最后一个人却已经购买成功,商品数量变为 499,则前面的事务被打断(监听的 500 数量),导致 1999 人会购买失败,库存还有 499 个商品。
测试
初始化信用卡可用余额和欠额
127.0.0.1:6379> set balance 100 OK 127.0.0.1:6379> set debt 0 OK
1
2
3
4使用 watch 检测 balance,事务期间 balance 数据未变动,事务执行成功
127.0.0.1:6379> watch balance OK 127.0.0.1:6379> multi # 开启事务 OK 127.0.0.1:6379> decrby balance 20 # 可用余额 -20 QUEUED 127.0.0.1:6379> incrby debt 20 # 欠款 +20 QUEUED 127.0.0.1:6379> exec # 执行事务 1) (integer) 80 2) (integer) 20
1
2
3
4
5
6
7
8
9
10
11使用 watch 检测 balance,若事务期间 balance 数据变动,则事务执行失败
窗口一
# 窗口一 127.0.0.1:6379> watch balance # 监视 balance OK 127.0.0.1:6379> MULTI # 执行完毕后,执行窗口二代码测试 OK 127.0.0.1:6379> decrby balance 20 QUEUED 127.0.0.1:6379> incrby debt 20 QUEUED 127.0.0.1:6379> exec # 修改失败!因为被监视的 balance 值改变 (nil)
1
2
3
4
5
6
7
8
9
10
11窗口二
# 窗口二 127.0.0.1:6379> get balance "80" 127.0.0.1:6379> set balance 200 OK
1
2
3
4
5窗口一:出现问题后放弃监视,然后重来
127.0.0.1:6379> UNWATCH # 放弃监视,这是取消所有的监视 OK 127.0.0.1:6379> watch balance # 监视 OK 127.0.0.1:6379> MULTI # 事务 OK 127.0.0.1:6379> decrby balance 20 QUEUED 127.0.0.1:6379> incrby debt 20 QUEUED 127.0.0.1:6379> exec # 成功 1) (integer) 180 2) (integer) 40
1
2
3
4
5
6
7
8
9
10
11
12
13说明:
一但执行 exec 指令或 descard 指令,无论事务是否执行成功,watch 指令对变量的监控都将被取消。
故当事务执行失败后,需重新执行 watch 命令对变量进行监控,并开启新的事务进行操作。
# 指令总结
Redis 事务相关指令:
序号 | 命令及描述 | 描述 |
---|---|---|
1 | DISCARD | 取消事务,放弃执行事务块内的所有命令 |
2 | EXEC | 执行所有事务块内的命令 |
3 | MULTI | 标记一个事务块的开始 |
4 | UNWATCH | 取消 WATCH 命令对所有 key 的监视 |
5 | WATCH key [key ...] | 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。类似乐观锁 |
# 秒杀案例
# 环境准备
首先创建一个 Spring Boot 项目,然后添加依赖:
<dependencies>
<!-- redis普通依赖包 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
<!-- spring boot + redis 整合 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 配置类
配置类我在 Redis - Java 整合 也提供了,如下:
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
//key 序列化方式
template.setKeySerializer(redisSerializer);
//value 序列化
template.setValueSerializer(jackson2JsonRedisSerializer);
//value hashmap 序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间 600 秒
RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.
fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
为什么要写配置类呢,因为自带的兼容性不好,我使用自带了报错,所以自己重写配置,覆盖官方自带的。
# 连接池
如果还是用 Jedis
自带的连接方式,那么容易出现超时问题,自带的连接方式:Jedis jedis = new Jedis("192.168.199.27",6379);
。
我们需要使用连接池来连接 Redis,防止出现超时问题
public class JedisPoolUtils {
private static volatile JedisPool jedisPool = null;
private JedisPoolUtils() {
}
public static JedisPool getJedisPoolInstance(){
if(null == jedisPool){
synchronized (JedisPoolUtils.class){
if(null == jedisPool){
JedisPoolConfig poolConfig = new JedisPoolConfig();
// 一个 pool 可分配多少个 jedis 实例
poolConfig.setMaxTotal(200);
// 一个 pool 最多有多少个状态为 idle(空闲)的 jedis 实例
poolConfig.setMaxIdle(32);
// 表示当 borrow 一个 jedis 实例时,最大的等待毫秒数
poolConfig.setMaxWaitMillis(100 * 1000);
poolConfig.setBlockWhenExhausted(true);
poolConfig.setTestOnBorrow(true);
jedisPool = new JedisPool(poolConfig, "192.168.199.27", 6379, 60000);
}
}
}
return jedisPool;
}
public static void release(JedisPool jedisPool, Jedis jedis){
if(null != jedis){
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
连接池参数:
MaxTotal:控制一个 pool 可分配多少个 jedis 实例,通过
pool.getResource()
来 获取;如果赋值为 -1,则表示不限制;如果 pool 已经分配了 MaxTotal 个 jedis 实例,则此时 pool 的状态为 exhaustedmaxIdle:控制一个 pool 最多有多少个状态为 idle(空闲)的 jedis 实例
MaxWaitMillis:表示当 borrow 一个 jedis 实例时,最大的等待毫秒数,如果超过等待时间,则直接抛 JedisConnectionException
# 秒杀版本一
这只是一个简单的 demo。重点:
31 行的 watch,进行监听
54 - 32 行的 multi 和 exce 的事务操作
@RestController
public class SecKill {
@RequestMapping("/secKill")
public boolean secKill(){
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 6; i++) {
int code = random.nextInt(10);
sb.append(code);
}
return doSecKill(sb.toString(), "123");
}
public boolean doSecKill(String uid,String productId){
if(uid == null || productId == null){
return false;
}
// Jedis jedis = new Jedis("192.168.199.27",6379);
// 通过连接池解决连接超时问题
Jedis jedis = JedisPoolUtils.getJedisPoolInstance().getResource();
// 秒杀的商品 Key
String kcKey = "sk:" + productId + ":qt";
// 秒杀的用户 Key
String userKey = "sk:" + productId + ":user";
jedis.watch(kcKey);
String kc = jedis.get(kcKey);
// 库存为 null
if(kc == null){
System.out.println("秒杀没有开始");
jedis.close();
return false;
}
// 库存卖完
if(Integer.parseInt(kc) <= 0){
System.out.println("秒杀已经结束");
jedis.close();
return false;
}
// 秒杀过程(出现库存为负情况)
// jedis.decr(kcKey);
// 添加秒杀的用户
// jedis.sadd(userKey, uid);
// 秒杀过程(通过乐观锁解决库存可能为负情况),但出现库存遗留问题
Transaction multi = jedis.multi();
multi.decr(kcKey);
multi.sadd(userKey, uid);
List<Object> exec = multi.exec();
if(exec == null || exec.size() == 0){
System.out.println("秒杀失败");
jedis.close();
return false;
}
System.out.println("秒杀成功");
jedis.close();
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
这个版本有很大的问题,上面的程序无法解决库存遗留问题,使用 LUA 脚本可解决(具体还有其他)。
原因:31 行代码开启了 watch。假设库存有 500 个商品,2000 个人进行秒杀购买(2000 个程序监听商品的 key),假设 1999 人同时购买,其内部程序监听的商品数量为 500,最后一个人却抢先购买成功,商品数量变为 499,则前面的事务被打断(监听的 500 数量),即 1999 人会购买失败,导致库存还有 499 个商品。
# 秒杀版本二
该版本利用 lua 语言,解决了库存遗留问题。
**什么是库存遗留问题?**即系统告诉用户已经秒光,可是还有库存。原因:就是乐观锁导致很多请求都失败,先点的没秒到,后点的可能秒到了。
通过 lua 脚本解决争抢问题,实际上是 Redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。
@RestController
public class SecKillByScript {
static String secKillScript = "local userid=KEYS[1]; \n" +
"local prodid=KEYS[2];\n" +
"local qtkey=\"sk:\"..prodid..\":qt\";\n" +
"local usersKey=\"sk:\"..prodid..\":user\";\n" +
"local userExists=redis.call(\"sismember\",usersKey,userid);\n" +
"if tonumber(userExists)==1 then \n" +
" return 2;\n" +
"end\n" +
"local num= redis.call(\"get\" ,qtkey);\n" +
"if tonumber(num)<=0 then \n" +
" return 0; \n" +
"else \n" +
" redis.call(\"decr\",qtkey);\n" +
"redis.call(\"sadd\",usersKey,userid);\n" +
"end\n" +
"return 1";
@RequestMapping("/secKillByScript")
public boolean secKill(){
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 6; i++) {
int code = random.nextInt(10);
sb.append(code);
}
return doSecKill(sb.toString(), "123");
}
@RequestMapping("/doSecKillByScript")
public boolean doSecKill(String userid,String productId){
Jedis jedis = JedisPoolUtils.getJedisPoolInstance().getResource();
String shal = jedis.scriptLoad(secKillScript);
Object evalsha = jedis.evalsha(shal, 2, userid, productId);
String value = String.valueOf(evalsha);
if("0".equals(value)){
System.out.println("已抢空");
}else if ("1".equals(value)){
System.out.println("抢购成功");
}else if("2".equals(value)){
System.out.println("该用户已经抢过了");
}else {
System.out.println("抢购异常");
}
jedis.close();
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 测试
利用能并发的工具访问 /secKillByScript
即可。
这里使用 ab 工具,首先安装它
yum install -y httpd-tools
安装完后,在某个目录创建一个文件,模拟表单提交参数
vim postfile
添加内容:(以 & 符号结尾)
prodid=0101&
启动测试:
ab -n 2000 -c 200 -k -p ~/postfile -T application/x-www-form-urlencoded http://192.168.199.27:8080/secKillByScript
192.168.199.27
是你本地的 IP 地址,因为是从 Linux 系统访问本机的项目。