JUC - 共享模式之锁进阶
# Park & Unpark
# 基本使用
它们是 LockSupport 类中的方法。
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)
2
3
4
先 park 再 unpark
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);
}
2
3
4
5
6
7
8
9
10
11
12
13
输出:
18:42:52.585 c.TestParkUnpark [t1] - start...
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...
2
3
4
park 会让线程暂停,unpark 取消 park 的效果,让线程继续运行。
先 unpark 再 park
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);
}
2
3
4
5
6
7
8
9
10
11
12
13
输出:
18:43:50.765 c.TestParkUnpark [t1] - start...
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...
2
3
4
所以先 unpark,再 park,那么不会暂停线程。因为 unpark 内部事先已经准备了一个「东西」让后面的 park 失效(只是后面的第一个 park,如果再调用一次 park,则线程会暂停)。
# 特点
与 Object 的 wait & notify 相比:
- wait & notify 和 notifyAll 必须配合
Object Monitor
一起使用,而 park,unpark 不必 - park & unpark 是以线程为单位来「阻塞」和「唤醒」线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么「精确」
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify
# park 和 unpark 原理
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond 和 _mutex,
_counter 用来判断是否暂停线程(0 暂停,1 不暂停), _cond 是存放暂停线程的空间, _mutex 是互斥锁,打个比喻:
- 线程就像一个旅人,Parker 就像他随身携带的背包,条件变量就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
- 调用 park 就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷歇息
- 如果备用干粮充足,那么不需停留,继续前进
- 调用 unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
- 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮
调用 park
- 当前线程调用
Unsafe.park()
方法 - 检查 _counter,本情况为 0,这时获得 _mutex 互斥锁
- 线程进入 _cond 条件变量,变成阻塞状态
- 设置 _counter = 0,代表暂停线程
调用 unpark
- 调用
Unsafe.unpark(Thread_0)
方法,设置 _counter 为 1,代表恢复线程 - 唤醒 _cond 条件变量中的 Thread_0
- Thread_0 恢复运行
- 设置 _counter 为 0,代表下一次调用 park 的线程暂停
先调用 unpark,再调用 park
- 调用
Unsafe.unpark(Thread_0)
方法,设置 _counter 为 1,代表恢复线程,这里指的是下一个调用 park 的线程直接恢复运行 - 当前线程调用
Unsafe.park()
方法 - 检查 _counter,此时为 1,这时线程无需阻塞,继续运行
- 设置 _counter 为 0,代表下一次调用 park 的线程暂停
# 重新理解线程状态转换
# 多种情况
假设有线程 Thread t。
情况 1 NEW --> RUNNABLE
当调用 t.start()
方法时,由 NEW --> RUNNABLE
情况 2 RUNNABLE <--> WAITING
t 线程用 synchronized(obj)
获取了对象锁后
- 调用
obj.wait()
方法时,t 线程从 RUNNABLE --> WAITING - 调用
obj.notify()
,obj.notifyAll()
,t.interrupt()
时- 竞争锁成功,t 线程从 WAITING --> RUNNABLE
- 竞争锁失败,t 线程从 WAITING --> BLOCKED
public class TestWaitNotify {
final static Object obj = new Object();
public static void main(String[] args) {
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...."); // 断点
}
},"t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行....");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其它代码...."); // 断点
}
},"t2").start();
sleep(0.5);
log.debug("唤醒 obj 上其它线程");
synchronized (obj) {
obj.notifyAll(); // 唤醒obj上所有等待线程 断点
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
情况 3 RUNNABLE <--> WAITING
- 当前线程调用
t.join()
方法时,当前线程从 RUNNABLE --> WAITING- 注意是当前线程在 t 线程对象的监视器上等待
- t 线程运行结束,或调用了当前线程的
interrupt()
时,当前线程从 WAITING --> RUNNABLE
情况 4 RUNNABLE <--> WAITING
- 当前线程调用
LockSupport.park()
方法会让当前线程从 RUNNABLE --> WAITING - 调用
LockSupport.unpark(目标线程)
或调用了线程的interrupt()
,会让目标线程从 WAITING --> RUNNABLE
情况 5 RUNNABLE <--> TIMED_WAITING
t 线程用 synchronized(obj)
获取了对象锁后
- 调用
obj.wait(long n)
方法时,t 线程从 RUNNABLE --> TIMED_WAITING - t 线程等待时间超过了 n 毫秒,或调用
obj.notify()
,obj.notifyAll()
,t.interrupt()
时- 竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
- 竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED
情况 6 RUNNABLE <--> TIMED_WAITING
- 当前线程调用
t.join(long n)
方法时,当前线程从 RUNNABLE --> TIMED_WAITING- 注意是当前线程在 t 线程对象的监视器上等待
- 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的
interrupt()
时,当前线程从 TIMED_WAITING --> RUNNABLE
情况 7 RUNNABLE <--> TIMED_WAITING
- 当前线程调用
Thread.sleep(long n)
,当前线程从 RUNNABLE --> TIMED_WAITING - 当前线程等待时间超过了 n 毫秒,当前线程从 TIMED_WAITING --> RUNNABLE
情况 8 RUNNABLE <--> TIMED_WAITING
- 当前线程调用
LockSupport.parkNanos(long nanos)
或LockSupport.parkUntil(long millis)
时,当前线程从 RUNNABLE --> TIMED_WAITING - 调用
LockSupport.unpark(目标线程)
或调用了线程的interrupt()
,或是等待超时,会让目标线程从 TIMED_WAITING--> RUNNABLE
情况 9 RUNNABLE <--> BLOCKED
- t 线程用
synchronized(obj)
获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED - 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED
情况 10 RUNNABLE <--> TERMINATED
当前线程所有代码运行完毕,进入 TERMINATED。
# 总结
- 任意线程调用 wait、join、park 等多线程方法都会进入 WAITING 状态
- 任意线程竞争抢锁失败,进入 BLOCKED 状态,竞争抢锁成功,则进入 RUNNABLE 状态
- 任意线程调用带有时间参数的多线程方法,如
wait(long n)
、join(long n)
、sleep(long n)
、parkUntil(long millis)
等,都进入 TIMED_WAITING 状态 - 任意线程抢到锁后,都直接进入 RUNNABLE 状态
# 多把锁
# 多把不相干的锁
一间大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低。
例如:
class BigRoom {
public void sleep() {
synchronized (this) {
log.debug("sleeping 2 小时");
// 睡 2s
Sleeper.sleep(2);
}
}
public void study() {
synchronized (this) {
log.debug("study 1 小时");
// 睡 1s
Sleeper.sleep(1);
}
}
}
public class Test {
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.compute();
},"小南").start();
new Thread(() -> {
bigRoom.sleep();
},"小女").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
某次执行结果:
12:13:54.471 [小南] c.BigRoom - study 1 小时
12:13:55.476 [小女] c.BigRoom - sleeping 2 小时
2
解决方法:准备多个房间(多个对象锁)。
改进代码:
class BigRoom {
// 多了两把锁
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}
public class Test {
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> {
bigRoom.compute();
},"小南").start();
new Thread(() -> {
bigRoom.sleep();
},"小女").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
某次执行结果:
12:15:35.069 [小南] c.BigRoom - study 1 小时
12:15:35.069 [小女] c.BigRoom - sleeping 2 小时
2
# 锁粒度细分优缺点
- 优点:是可以增强并发度
- 缺点:如果一个线程需要同时获得多把锁,就容易发生死锁
# 活跃性
# 死锁
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁。
- t1 线程获得 A 对象锁,接下来想获取 B 对象的锁
- t2 线程获得 B 对象锁,接下来想获取 A 对象的锁
例:
public static void main(String[] args) {
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
sleep(1);
// 试图获取 B 的锁
synchronized (B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
sleep(0.5);
// 试图获取 A 的锁
synchronized (A) {
log.debug("lock A");
}
log.debug("操作...");
}
}, "t2");
t1.start();
t2.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
输出:
12:22:06.962 [t2] c.TestDeadLock - lock B
12:22:06.962 [t1] c.TestDeadLock - lock A
// 卡在这里, 不向下运行
2
3
# 定位死锁
检测死锁可以使用 jconsole 工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:
cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
22816 KotlinCompileDaemon
33200 TestDeadLock // JVM 进程
11508 Main
28468 Launcher
2
3
4
5
6
7
输出(截取部分):
Found one Java-level deadlock: // 发现一个 Java 级别的死锁
=============================
"Thread-1":
waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
which is held by "Thread-1"
2
3
4
5
6
7
8
- 避免死锁要注意加锁顺序
- 另外如果由于某个线程进入了死循环,导致其它线程一直等待,对于这种情况 linux 下可以通过 top 先定位到 CPU 占用高的 Java 进程,再利用
top -Hp
进程 id 来定位是哪个线程,最后再用 jstack 排查
# 哲学家就餐问题
有五位哲学家,围坐在圆桌旁。
- 他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考
- 吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子
- 如果筷子被身边的人拿着,自己就得等待
假设五位哲学家分别拿起一个筷子,且不放下,则五位哲学家就一直等待其他人放下筷子,但是其他人不放下,则僵持产生死锁。
筷子类
class Chopstick {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
2
3
4
5
6
7
8
9
10
哲学家类
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
@Override
public void run() {
while (true) {
// 获得左手筷子
synchronized (left) {
// 获得右手筷子
synchronized (right) {
// 吃饭
eat();
}
// 放下右手筷子
}
// 放下左手筷子
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
就餐测试
public class Test {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
输出结果(截取死锁部分):
12:33:15.575 [苏格拉底] c.Philosopher - eating...
12:33:15.575 [亚里士多德] c.Philosopher - eating...
12:33:16.580 [阿基米德] c.Philosopher - eating...
12:33:17.580 [阿基米德] c.Philosopher - eating...
// 卡在这里, 不向下运行
2
3
4
5
使用 jconsole 检测死锁,发现:
-------------------------------------------------------------------------
名称: 阿基米德
状态: cn.itcast.Chopstick@1540e19d (筷子1) 上的BLOCKED, 拥有者: 苏格拉底
总阻止数: 2, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@6d6f6e28 (筷子5)
-------------------------------------------------------------------------
名称: 苏格拉底
状态: cn.itcast.Chopstick@677327b6 (筷子2) 上的BLOCKED, 拥有者: 柏拉图
总阻止数: 2, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@1540e19d (筷子1)
-------------------------------------------------------------------------
名称: 柏拉图
状态: cn.itcast.Chopstick@14ae5a5 (筷子3) 上的BLOCKED, 拥有者: 亚里士多德
总阻止数: 2, 总等待数: 0
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@677327b6 (筷子2)
-------------------------------------------------------------------------
名称: 亚里士多德
状态: cn.itcast.Chopstick@7f31245a (筷子4) 上的BLOCKED, 拥有者: 赫拉克利特
总阻止数: 1, 总等待数: 1
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@14ae5a5 (筷子3)
-------------------------------------------------------------------------
名称: 赫拉克利特
状态: cn.itcast.Chopstick@6d6f6e28 (筷子5) 上的BLOCKED, 拥有者: 阿基米德
总阻止数: 2, 总等待数: 0
堆栈跟踪:
cn.itcast.Philosopher.run(TestDinner.java:48)
- 已锁定 cn.itcast.Chopstick@7f31245a (筷子4)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
这种线程没有按预期结束,执行不下去的情况,归类为「活跃性」问题,除了死锁以外,还有活锁和饥饿者两种情况。
# 活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如:
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
线程 t1 对 count++,线程 t2 对 count--,导致 count 加完就减,减完就加,所以 一直维持在一定范围内,导致无法结束运行。
解决:让线程进行 sleep 睡眠,其睡眠时间不能一致,可以利用 Random 产生随机睡眠时间。
# 饥饿
饥饿定义:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及饥饿问题。
如下图,线程 2 打算获取对象 A 的锁,却被线程 1 获取了,当线程 2 放弃对象 A 的锁,去获取对象 B 的锁,又被线程 1 抢先一步获取,导致线程 2 产生饥饿问题,即在整个运行过程,都无法获取任何锁。
可以在 哲学家就餐问题 将就餐测试代码改为如下,就可以看到饥饿问题,即某个哲学家永远无法获取筷子,并且又不会产生死锁或活锁:
public class Test {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c1, c5).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
顺序加锁产生饥饿问题的解决方案:在线程 2 尝试获取锁失败后,进入阻塞状态,等待线程 1 释放锁,再去获取。
死锁、活锁和饥饿可以用 java.util.concurrent
包下的 ReentrantLock 类解决,看下面介绍。
# 公平锁非公平锁
公平锁:是指多个线程按照申请锁的顺序来获取锁,类似排队打饭,先来后到。
非公平锁:是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比现申请的线程优先获取锁,在高并发的情况下,有可能会造成优先级反转或者饥饿现象。
// 无参
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2
3
4
5
6
7
8
并发包中的 ReentrantLock 的创建可以指定构造函数的 boolean 类型来得到公平锁或者非公平锁,默认是非公平锁。
公平锁:就是很公平,在并发环境中,每个线程在获取到锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,以后会按照 FIFO(先进先出)的规则从队列中取到自己。
非公平锁:非公平锁比较粗鲁,上来就直接尝试占有锁,如果尝试失败,就会采用类似公平锁那种方式。
就 ReentrantLock 而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点 在于吞吐量比公平锁大。
对于 Synchronized 而言,也是一种非公平锁。
# 可重入锁
可重入锁(也叫递归锁),指的是同一线程外层函数获得锁之后,内层递归函数仍然能获取该锁的代码,在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁,也就是说,线程可以进入任何一个它已经拥有的锁,所同步着的代码块。好比家里进入大门之后,就可以进入里面的房间了。
ReentrantLock、Synchronized 就是一个典型的可重入锁。
可重入锁最大的作用就是避免死锁。
测试:Synchronized
public class SynchronizedDemo {
public static void main(String[] args) {
Phone phone = new Phone();
// T1 线程在外层获取锁时,也会自动获取里面的锁
new Thread(() -> {
phone.sendSMS();
}, "T1").start();
new Thread(() -> {
phone.sendSMS();
}, "T2").start();
}
}
class Phone {
public synchronized void sendSMS() {
System.out.println(Thread.currentThread().getName() + " sendSMS");
sendEmail();
}
public synchronized void sendEmail() {
System.out.println(Thread.currentThread().getName() + " sendEmail");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 自旋锁
自旋锁(spinlock),是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗 CPU。
public class SpinLockDemo {
// 原子引用线程, 没写参数,引用类型默认为 null
AtomicReference<Thread> atomicReference = new AtomicReference<>();
// 上锁
public void myLock() {
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName() + "==>mylock");
// 自旋
while (!atomicReference.compareAndSet(null, thread)) {
}
}
//解锁
public void myUnlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(Thread.currentThread().getName() + "==>myUnlock");
}
public static void main(String[] args) throws InterruptedException {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnlock();
}, "T1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnlock();
}, "T2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# ReentrantLock
相对于 synchronized 它具备如下特点:
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与 synchronized 一样,都支持可重入
基本语法
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
2
3
4
5
6
7
8
# 可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次反复获取这把锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
public class Test {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
输出:
17:59:11.862 [main] c.TestReentrant - execute method1
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3
2
3
此时是一个 main 线程,可以看到,第 7 行 main 线程获取了锁,16 行依然获取锁,然后 25 行也可以获取锁,代表可重入。
# 可打断
加锁后能被打断的 API 是 lockInterruptibly()
,lockInterruptibly 方法和 lock 方法一样能获取锁,但又多出来一个特点:能被打断,即:
ReentrantLock lock = new ReentrantLock();
lock.lockInterruptibly();
2
示例:
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
输出:
18:02:40.520 [main] c.TestInterrupt - 获得了锁
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断
2
3
4
5
6
7
8
9
10
11
12
13
14
注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断,如 lock 方法
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
lock.lock();
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
sleep(1);
} finally {
log.debug("释放了锁");
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
输出:
18:06:56.261 [main] c.TestInterrupt - 获得了锁
18:06:56.265 [t1] c.TestInterrupt - 启动...
18:06:57.266 [main] c.TestInterrupt - 执行打断 // 这时 t1 并没有被真正打断, 而是仍继续等待锁
18:06:58.267 [main] c.TestInterrupt - 释放了锁
18:06:58.267 [t1] c.TestInterrupt - 获得了锁
2
3
4
5
# 锁超时
线程尝试获取锁时,超出了一定时间,则放弃获取锁。
锁超时的 API 为 tryLock()
,tryLock 方法会返回一个 boolean 值,如果获取到锁,则返回 true,如果获取不到锁,则返回 false。
即:
ReentrantLock lock = new ReentrantLock();
boolean isGetLock = lock.tryLock();
// 或者
boolean isGetLock = lock.tryLock(long timeout, TimeUnit unit); // TimeUnit 为时间单位,如秒、分、时,timeout 为时间值
2
3
4
锁超时又分为两者情况:
立刻失败
直接使用 lock.tryLock();
,判断返回的 boolean 类型知道是否成功获取锁。
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
输出:
18:15:02.918 [main] c.TestTimeout - 获得了锁
18:15:02.921 [t1] c.TestTimeout - 启动...
18:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回
2
3
因为 main 线程先获取锁,2s 才释放锁,而 t1 线程直接尝试获取锁失败,返回 false。
超时失败
规定好超时时间,使用 lock.tryLock(long timeout, TimeUnit unit);
,判断返回的 boolean 类型知道是否成功获取锁。
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取等待 1s 后失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
输出:
18:19:40.537 [main] c.TestTimeout - 获得了锁
18:19:40.544 [t1] c.TestTimeout - 启动...
18:19:41.547 [t1] c.TestTimeout - 获取等待 1s 后失败,返回
2
3
main 线程先获取锁,然后睡眠 2s 才释放锁,而 t1 线程等待 1s 后无法获取锁,于是就返回 false。
# 使用 tryLock 解决哲学家就餐问题
因为筷子类是对象锁,所以直接令筷子类继承 ReentrantLock
,这样筷子对象都拥有 ReentrantLock
的 API。
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
2
3
4
5
6
7
8
9
10
哲学家类。当某个哲学家尝试获取左右筷子,如果获取失败,则放下自己手中的筷子给别人使用。
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
eat();
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}
}
}
private void eat() {
log.debug("eating...");
Sleeper.sleep(1);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
就餐测试
public class Test {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 公平锁
公平锁是为了解决饥饿问题,即让所有线程都 平均 的获取锁。
如何开启公平锁呢?ReentrantLock 有个构造函数传入 boolean 值,就是是否开启公平锁。
ReentrantLock r = new ReentrantLock(boolean fair);
ReentrantLock 默认是不公平的,即 fair = false
。
例子:
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock(false);
lock.lock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入").start();
lock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
强行插入,有机会在中间输出(注意:该实验不一定总能复现)。
t39 running...
t40 running...
t41 running...
t42 running...
t43 running...
强行插入 start...
强行插入 running...
t44 running...
t45 running...
t46 running...
t47 running...
t49 running...
2
3
4
5
6
7
8
9
10
11
12
改为公平锁后:
ReentrantLock lock = new ReentrantLock(true);
强行插入,总是在最后输出:
t465 running...
t464 running...
t477 running...
t442 running...
t468 running...
t493 running...
t482 running...
t485 running...
t481 running...
强行插入 start...
强行插入 running...
2
3
4
5
6
7
8
9
10
11
公平锁一般没有必要,会降低并发度,后面分析原理时会讲解。
# 条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比:
- synchronized 是那些不满足条件的线程都在一间休息室等消息
- 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤醒
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
如何创建条件变量呢?利用 newCondition()
,如下:
ReentrantLock lock = new ReentrantLock();
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
// c1 等待
c1.await();
// c2 等待
c2.await(long n);
// 唤醒 c1 的一个随机线程
c1.signal(); // 等价于 notify()
// 唤醒 c2 所有线程
c2.signalAll(); // 等价于 notifyAll()
2
3
4
5
6
7
8
9
10
11
12
13
在哪个线程里使用 c1.await()
,则该线程暂停等待,如果需要唤醒,则在其他线程调用 c1.signal()
唤醒 c1 里的一个线程。
如果想唤醒 c1 的多个线程,则是 c1.signalAll()
。
注意:c1 是一个空间,类似于休息室,在哪个线程调用
c1.await()
,则该线程进入 c1 休息室,要想出来(唤醒),就等待其他线程调用c1.signal()
或c1.signalAll()
。
例子:
public static void main(String[] args) {
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
// 该线程进入 waitCigaretteQueue 里等待
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
// 该线程进入 waitbreakfastQueue 里等待
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
// 唤醒 waitCigaretteQueue 里的一个随机线程
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
// 唤醒 waitbreakfastQueue 里的一个随机线程
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
输出:
18:52:27.680 [main] c.TestCondition - 送早餐来了
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
2
3
4
# 区别
ReentrantLock 和 Synchronized 区别:
- 首先 Synchronized 是 Java 内置关键字,在 JVM 层面,ReentrantLock 是个 Java 类
- Synchronized 无法判断是否获取锁的状态,ReentrantLock 可以判断是否获取到锁
- Synchronized 会自动释放锁(a 线程执行完同步代码会释放锁;b 线程执行过程中发生异常会释放锁),ReentrantLock 需在 finally 中手动释放锁(
unlock()
方法释放锁),否则容易造成线程死锁 - 用 Synchronized 关键字的两个线程 1 和线程 2,如果当前线程 1 获得锁,线程 2 线程等待。如果线程 1 阻塞,线程 2 则会一直等待下去,而 ReentrantLock 锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了
- Synchronized 的锁可重入、不可中断、非公平,而 ReentrantLock 锁可重入、可判断、可公平(两者皆可)
- ReentrantLock 锁适合大量同步的代码的同步问题,Synchronized 锁适合代码少量的同步问题
# 同步模式之顺序控制
# 固定运行顺序
比如,必须先打印 2 后再打印 1。
# wait notify 版
创建一个对象锁和一个 boolean 对象,前者是线程锁,后者是判断先打印 1 还是先打印 2。
public static void main(String[] args) {
// 用来同步的对象
static Object obj = new Object();
// t2 运行标记, 代表 t2 是否执行过
static boolean t2runed = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj) {
// 如果 t2 没有执行过
while (!t2runed) {
try {
// t1 先等一会
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
});
Thread t2 = new Thread(() -> {
System.out.println(2);
synchronized (obj) {
// 修改运行标记
t2runed = true;
// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
obj.notifyAll();
}
});
t1.start();
t2.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# Park Unpark 版
可以看到,wait notify 版实现很麻烦:
- 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该 wait
- 第二,如果有些干扰线程错误的 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决此问题
- 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个
可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
});
t1.start();
t2.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』,不需要『同步对象』和『运行标记』。
# 交替输出
线程 1 输出 a 共 5 次,线程 2 输出 b 共 5 次,线程 3 输出 c 共 5 次。现在要求输出 abcabcabcabcabc 怎么实现?
# wait notify 版
利用 flag 来说存放当前线程。
写一个 print 方法,有三个参数:str 是打印的信息,curFlag 是当前线程,nextFlag 是下一个执行的线程。
当传过来的当前线程等于 flag,则打印该当前线程携带的信息,接着 curFlag 等于下一个执行的线程 nextFlag,然后唤醒所有线程,只有满足 nextFlag == falg
才打印携带的信息,其他线程虽然被唤醒,但是没有跳出 while 循环,继续 wait 等待。
class SyncWaitNotify {
// 1 代表打印 a,2 代表打印 b,3 代表打印 c
private int flag;
// 循环次数
private int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
// curFlag:当前线程,nextFlag:下一个执行的线程,str:打印的信息
public void print(String str, int curFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != curFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}
public class Test {
public static void main(String[] args) {
// 初始化 1 打印,循环 5 次
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> {
syncWaitNotify.print(1, 2, "a");
}).start();
new Thread(() -> {
syncWaitNotify.print(2, 3, "b");
}).start();
new Thread(() -> {
syncWaitNotify.print(3, 1, "c");
}).start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# Lock 条件变量版
利用到了 ReentrantLock 的条件变量 Condition。
写一个 print 方法,有三个参数:str 是打印的信息,current 是当前条件变量,next 是下一个执行条件变量。
先让所有条件变量 current.wait()
等待,然后唤醒一个当前条件变量,该条件变量携带的信息打印后,唤醒下一个条件变量,打印下一个条件变量携带的信息,依次类推。
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
// 当前线程等待
current.await();
// 如果等待结束,则打印 str
System.out.print(str);
// 唤醒下一个线程
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
public class Test {
public static void main(String[] args) {
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
// 初始化 aWaitSet 条件变量打印
as.start(aWaitSet);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
注意:该实现没有考虑 a,b,c 线程都就绪再开始。
# Park Unpark 版
class SyncPark {
private int loopNumber;
// 线程的集合数组
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
// 暂停当前线程
LockSupport.park();
// 暂停结束后打印信息
System.out.print(str);
// 唤醒下一个线程
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
// 获取当前线程
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
// 获取 threads 里的线程位置
if(threads[i] == current) {
index = i;
break;
}
}
// 返回下一个线程(当前线程位置 + 1)
if(index < threads.length - 1) {
return threads[index + 1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
// 初始化d
LockSupport.unpark(threads[0]);
}
}
public class Test {
public static void main(String[] args) {
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69