前言
上一篇文章给和大家一起分析了ReentrantReadWriteLock的readLock的原理,相信大家对readLock的实现已经有所了解,今天我们继续分析writeLock。
writeLock源码分析
我们直接进入正题,通过源码分析加深对writeLock的理解。我们通过lock()和unlock()两个方法为入口进行分析。
lock()
lock()方法的逻辑都是通过acquire()方法实现的,所以我们分析acquire()方法就可以了。
public void lock() {
sync.acquire(1);
}
acquire()
acquire()是AQS中的方法,我们主要分析tryAcquire(),acquireQueued(),addWaiter()三个方法的逻辑。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()
tryAcquire()方法用户尝试获取写锁,如果获取成功返回true,失败返回fasle。主要的逻辑由以下几步组成。
1.获取state,如果写state不为0(表示已经有线程获取了锁(读锁或者写锁)),如果有线程获取读锁或者获取写锁的线程不是当前线程,直接返回false。所以如果有线程获取了读锁,那么再尝试获取写锁是会失败的。如果获取写锁的数量超过最大值限制,抛出异常。如果获取写锁的线程是当前线程,那么修改state,增加重试次数,返回true。
2.writerShouldBlock()返回true或者cas修改state失败,那么返回false。
3.state为0并且writerShouldBlock()返回false,cas修改state成功,直接设置获取写锁的线程为当前线程,然后返回true。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//执行到这里代表w为0而且获取写锁的线程是当前线程,那么意味着写锁的重入。
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
writerShouldBlock()
writerShouldBlock()方法用于判断线程获取写锁的时候是否应该阻塞,有公平和非公平两种实现。
公平实现:
判断阻塞队列中是否有节点,并且第一个节点不是当前线程。那么作为公平锁,这就代表需要等待。
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
非公平锁:
作为非公平锁是允许插队的,直接返回false。
final boolean writerShouldBlock() {
return false;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
addWaiter()方法我们已经分析过很多次了,逻辑也比较简单,就是将当前线程构建成一个Node,并且加入到阻塞队列中。这里就不再赘述了。接下来我们分析acquireQueued()方法。
acquireQueued()
1.判断head节点的下一个节点是否是当前线程的节点,如果是,那么使用tryAcquire()方法再试尝试获取写锁。如果获取成功就重新设置头节点并且返回。
2.调用shouldParkAfterFailedAcquire()方法将前一个节点的状态设置为SIGNAL状态。
3.调用parkAndCheckInterrupt()方法,parkAndCheckInterrupt()方法利用LockSupport.park()方法阻塞当前线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire()
shouldParkAfterFailedAcquire()方法的作用就是将前面一个节点的状态修改为SIGNAL状态,并且将CANCELLED状态的节点去除(waitStatus大于0,只能是CANCELLED状态)。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
利用LockSupport.park()方法阻塞线程,线程被唤醒之后判断线程是否被中断过。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
lock()方法到这里就分析完了,接下来我们分析unlock()方法。
unlock()
unlock()方法通过release()方法来实现逻辑,所以我们分析release()方法。
public void unlock() {
sync.release(1);
}
release()
首先通过tryRelease()方法判断是否需要唤醒阻塞的线程,如果tryRelease()方法返回true,那么调用unparkSuccessor()方法唤醒被阻塞的线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//如果头结点不为null 并且状态不为0 表示阻塞队列中存在元素
//因为在lock()方法中线程阻塞前调用了shouldParkAfterFailedAcquire()方法,将状态修改成了SIGNAL 也就是-1
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
1.通过isHeldExclusively()方法判断获取锁的线程是否是当前线程,如果不是,那么抛出异常。
2.获取state,计算新的state值,如果新的state值为0,代表已经完全释放锁,通过setExclusiveOwnerThread()方法将获取锁线程的变量设置为null。
3.修改state的值。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
unparkSuccessor()
unparkSuccessor()方法的作用是唤醒头节点后第一个不为null且状态不为cancelled的节点。通过LockSupport.unpark()方法唤醒阻塞的线程。
private void unparkSuccessor(Node node) {
//获取头结点的状态 将头结点状态设置为0 代表现在正在有线程被唤醒 如果head状态为0 就不会进入这个方法了
int ws = node.waitStatus;
if (ws < 0)
//将头结点状态设置为0
compareAndSetWaitStatus(node, ws, 0);
//唤醒头结点的下一个状态不是cancelled的节点 (因为头结点是不存储阻塞线程的)
Node s = node.next;
//当前节点是null 或者是cancelled状态
if (s == null || s.waitStatus > 0) {
s = null;
//从aqs链表的尾部开始遍历 找到离头结点最近的 不为空的 状态不是cancelled的节点 赋值给s
//这里为什么从尾结点开始遍历而不是头结点 是因为添加结点的时候是先初始化结点的prev的, 从尾结点开始遍历 不会出现prve没有赋值的情况
//如果从头结点进行遍历 next为null 并不能保证链表遍历完了
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//调用LockSupport.unpark()唤醒指定的线程
LockSupport.unpark(s.thread);
}
至此unlock()方法也就分析完了。
小结
1.ReentrantReadWriteLock的writeLock支持重入。
2.ReentrantReadWriteLock的writeLock与读锁是冲突的,有线程获取了读锁,那么当前线程获取写锁是需要阻塞等待的(当前线程获取读锁也需要等待)。
最后
最后,如果有任何疑问,欢迎在下方评论区留言。