private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
public static class ReadLock
public static class WriteLock
内部的Sync中有如下定义。
c指的AbstractQueuedSynchronizer的state。SHARED_UNIT为65536。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
读锁调用的AbstractQueuedSynchronizer中acquireShared,方法的实现在读锁内部,方法内部一段代码注释帮助大家理解
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
我们将焦点聚集在tryAcquireShared上。
第一次加锁时state为0,exclusiveCount和sharedCount(c)也是0,经过compareAndSetState(c, c + SHARED_UNIT),c变为65536
如果相同线程再次加读锁,则c的值再次加65536,firstReaderHoldCount变为2
假设此刻c为65536,不同线程尝试加读锁时,sharedCount(c)计算如下为例,将65536表示为32位二进制
00000000000000010000000000000000
无符号右移16位
00000000000000000000000000000001
对应int值1,表示当前读锁加锁数量。此时代码进入
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
创建HoldCounter,并将count设置为1,线程号为当前线程的Id。HoldCounter可以理解为某线程加锁的次数。
如果线程因为并发原因导致无法进入if块,比如在CAS c时失败,就进入fullTryAcquireShared,代码和上述大同小异,主要是进入 for (;;){}循环尝试不停加锁。
如果因为别的线程加了写锁,则因为如下代码,加锁失败。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
纵观try的过程,两个返回值1代表加锁成功,-1代表加锁失败。加锁失败时就会执行doAcquireShared,将当前线程作为一个节点加入双向链表。
方法的具体代码和之前的很相似。区别主要有两点:1.Node的类型为SHARED,2.setHeadAndPropagate替换了setHead,重点在于此。
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
满足一系列的判断会调用doReleaseShared。原因是现在加的共享锁,所以需要唤醒后继需要加共享锁的节点去加锁。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
doReleaseShared做的事情就是如果当前head节点是SIGNAL,就唤醒后继节点,否则将状态设为PROPAGATE,如果head节点变化了,就结束。
首先如果所有的加锁都是读锁,是不会有一个链表的,必然要么是有线程加了读锁后又有线程尝试加写锁,或者相反。
如下图所示,N0为head,类型为共享锁,所以首先唤醒N0,unpark后循环继续,在判断h==head前,类型为独占的N1加入进来了,但还未更改状态,_所以N1的状态随时有可能变化,从0变成了SIGNAL
或者还没变化,但如果节点一直处于链表中没有唤醒,它的状态会变为SIGNAL_。
unpark前一瞬间的列表
+------+ +------+
| N0(S)| <---- | N1(E)|
|SIGNAL| ----> |DEFAULT|
+------+ +------+
unpark后一瞬间的列表,此后循环会尝试将DEFAULT值改为PROPAGATE
+------+
| N1(E) |
|DEFAULT|
+------+
此后又有类型为共享的N2加入进来,此刻N1的值可能为PROPAGATE或者SIGNAL
+---------+ +------+
| N1(E) | <---- | N2(S)|
|PROPAGATE| ----> |DEFAULT|
+---------+ +------+
独占锁在释放时判断只要状态不为0,就可以唤醒后继者,所以N1的状态为PROPAGATE也没有关系。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
直接看下已经有线程对加锁的情况下获取写锁的流程。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
如果已经有线程持有读/写锁, c为65536或者其倍数,假设为65536,那么经过exclusiveCount后,即65536&65535,结果为0。
此时getExclusiveOwnerThread()返回null/持有写锁的线程实例,所以加锁失败,那么剩下逻辑就是上篇对AbstractQueuedSynchronizer acquire方法的分析。
如果是同一线程写锁重入,则进行运算1&65535 结果为1,加上已经重入的次数只要不超过最大值就对c+1
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
如果加写锁失败,那么流程将进入AbstractQueuedSynchronizer acquireQueued中。相关内容及释放过程已经在前文中描述过了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。