ReentrantLock 解析
参考地址: https://blog.csdn.net/persistence_PSH/article/details/114504207
1. ReentrantLock
- ReentrantLock是基于AQS实现,AQS的基础又是CAS
- ReentrantLock 中有三个内部类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class ReentrantLock implements Lock, java.io.Serializable {
abstract static class Sync extends AbstractQueuedSynchronizer{...}
// 非公平锁
static final class NonfairSync extends Sync {...}
// 公平锁
static final class FairSync extends Sync {...}
//默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//可以通过构建对象时传入的boolean来设定锁是否公平
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//获取锁
public void lock() {
sync.lock();
}
//尝试获取锁
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//尝试获取锁,参数为尝试的时间和时间单位
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//释放锁
public void unlock() {
sync.release(1);
}
}
以上是 ReentrantLock 类中的一些主要结构
看到 Sync 类其实是继承 AbstractQueuedSynchronizer
2. AbstractQueuedSynchronizer
定义
1
2
3public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {}构造器
1 | protected AbstractQueuedSynchronizer() { } |
2.1 静态内部类Node
- ReentrantLock实现的前提就是AbstractQueuedSynchronizer,简称AQS,是java.util.concurrent的核心,CountDownLatch、FutureTask、Semaphore、ReentrantLock等都有一个内部类是这个抽象类的子类
AQS内部有一个内部类Node,每个node都是一个节点
1 | static final class Node{ |
可以看得出来,Node 本身具备一种数据结构 双向链表
- AQS中有的变量
1 | //FIFO队列中的头Node |
- AQS是典型的模板模式的应用,FIFO队列的各种操作在AQS中已经实现,AQS的子类一般只需要重写tryAcquire(int arg)和tryRelease(int arg)两个方法即可。
3. ReentrantLock的实现
- ReentrantLock根据传入构造方法的布尔型参数实例化出Sync的实现类FairSync和NonfairSync,分别表示公平的Sync和非公平的Sync。
- ReentrantLock使用较多的为是非公平锁,因为非公平锁吞吐量大
下面都是以非公平锁举例:
lock
- 假设线程1调用了ReentrantLock的lock()方法,那么线程1将会独占锁:
1 | final void lock() { |
第一个获取锁的线程就做了两件事情:
- 1、设置AbstractQueuedSynchronizer的state为1
- 2、设置AbstractOwnableSynchronizer的thread为当前线程
这两步做完之后就表示线程1独占了锁。然后线程2也要尝试获取同一个锁,在线程1没有释放锁的情况下,线程2会阻塞。
因为锁已被线程1占有,此时 status = 1, 所以在 lock 函数中,会走到 else 中
acquire 函数是在父类 AbstractQueuedSynchronizer 中实现的。
1 | public final void acquire(int arg) { |
当调用 acquire 时,会调用 addWaiter 函数 & acquireQueued 函数
addWaiter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private Node addWaiter(Node mode) {
//创建一个当前线程的Node,模式为独占模式(因为传入的mode是一个NULL)
Node node = new Node(mode);
//死循环
for (;;) {
Node oldTail = tail;
//尾部的node部位不为空,则等待队列不为空,线程2为第一个需要添加到等待队列的,因为多线程并发,所以等待队列有可能不为空
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
// oldTail 没有被其他线程修改,此时将 传入的 node 节点放置在 链表的尾部
oldTail.next = node;
return node;
}
} else {
// 初始化链表
initializeSyncQueue();
}
}
}acquireQueued
1 | final boolean acquireQueued(final Node node, int arg) { |
当线程2仍无法获取到锁时,会调用 shouldParkAfterFailedAcquire && parkAndCheckInterrupt 进行判断,是否进行阻塞
shouldParkAfterFailedAcquire
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//第一次这个waitStatus是h的waitStatus,很明显是0,第二次为-1,会返回true
int s = pred.waitStatus;
if (s < 0)
return true;
if (s > 0) {
do {
node.prev = pred = pred.prev;
}
while (pred.waitStatus > 0);
pred.next = node;
}
else
//把h的waitStatus设置为Noed.SIGNAL即-1并返回false
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
return false;
}parkAndCheckInterrupt
1 | private final boolean parkAndCheckInterrupt() { |
tryLock
- 在上面的代码中可以看到,tryLock 会执行 tryAcquireNanos 函数
1 | final boolean nonfairTryAcquire(int acquires) { |
- 上面部分是加锁部分,接下来看看解锁部分
unlock
1 | public void unlock() { |
release 函数会调用到 AbstractQueuedSynchronizer 中的 release
会调用 tryRelease 进行判断
1 |
|
- 但是 tryRelease 会直接抛出异常,也就是说,会调用子类中的 tryRelease 函数
tryRelease 函数为何不写成抽象函数?
tryRelease
1 | protected final boolean tryRelease(int releases) { |
- 当返回 free 为 true 时,release 函数中会执行 unparkSuccessor() 函数
1 |
|
锁被解了怎样保证整个FIFO队列减少一个Node,回到了AQS的acquireQueued方法了
回到上面讲过的未获取到锁的线程被阻塞的 acquireQueued 函数中:
1 | final boolean acquireQueued(final Node node, int arg) { |
- 被阻塞的线程2是被阻塞了,但是此处并没有return语句,所以,阻塞完成线程2依然会进行for循环。
- 然后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束
- setHead
1 | private void setHead(Node node) { |
setHead方法里面的前驱Node是Null,也没有线程,那么为什么不用一个在等待的线程作为Head Node呢?
因为一个线程随时有可能因为中断而取消,而取消的话,Node自然就要被GC了,那GC前必然要把头Node的后继Node变为一个新的头而且要应对多种情况,这样就很麻烦。
用一个没有thread的Node作为头,相当于起了一个引导作用,因为head没有线程,自然也不会被取消。从尾到头遍历,找出离head最近的一个node,对这个node进行unPark操作。
个人解读: 因为不知道之前被阻塞的线程会处于何种状态,直接让链表中下一个节点获取锁,可能会发生一些异常。
所以直接只释放锁,让各个线程再次抢占,这也是非公平锁的原理。
锁释放时,不是直接在阻塞链表中取下一个节点的线程,而且所有节点再次抢占,甚至可能让最新进来的线程获取到锁
ReentrantLock 中的其他函数
getHoldCount 获取state值
- 获取到state值,也就知道了锁的状态
1
2
3final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
- 获取到state值,也就知道了锁的状态
getOwner 获取占有锁的线程
1
2
3
4//获取当前占有锁的线程,就是AbstractOwnableSynchronizer中exclusiveOwnerThread的值
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}getQueuedThreads 获取所有阻塞的线程
1
2
3
4
5
6
7
8
9
10//从尾到头遍历一下,添加进ArrayList(等待队列)中
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
死锁问题
如何出现死锁
多个线程获取多个锁,形成了循环依赖锁,导致死锁
- 线程A获取到锁1,并尝试获取锁2。
- 线程B获取到锁2,并尝试获取锁1。
- 线程A获取到锁1时,锁2已经被线程B获取。
- 此时就形成了死锁
测试的线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25class TestDeadLock(var flag: Boolean, var any1: Any, var any2: Any) : Runnable {
override fun run() {
if(flag) {
synchronized(any1) {
println("${flag} 线程: 获取到any1的锁")
Thread.sleep(1000)
synchronized(any2) {
println("${flag} 线程: 获取到any2的锁")
}
}
} else {
synchronized(any2) {
println("${flag} 线程: 获取到any2的锁")
Thread.sleep(1000)
synchronized(any1) {
println("${flag} 线程: 获取到any1的锁")
}
}
}
println("${flag} 线程: 未出现死锁!!!")
}
}多个线程调用锁的地方
1
2
3
4
5
6
7
8
9binding.deadLock.setOnClickListener {
val any1 = Any()
val any2 = Any()
val thread1: Thread = Thread(TestDeadLock(true, any1, any2))
val thread2: Thread = Thread(TestDeadLock(false, any1, any2))
thread1.start()
thread2.start()
}
最后可以发现, “未出现死锁!!!” 这一句输出永远不会出现,
因为此时已经死锁
如何避免
- synchronized 锁的对象保持顺序一致,
- 如上面示例,将 else 中 any2 与 any1 顺序对调以下即可避免死锁
- lock 锁的顺序保持一致