juc中的并发容器都是基于volatile变量和CAS指令实现,ReentrantLock也不例外,其类图如下所示:
//FairSync加锁流程,
ReentrantLock:lock()
--> Sync:lock()
--> FairSync:initialLock()//首次尝试加锁及可重入逻辑
--> AbstractQueuedSynchronizer:acquire(int arg)
--> FairSync:tryAcquire(int arg)//如果CLH队列没有线程等待且CAS修改status成功,加锁
--> AbstractQueuedSynchronizer:acquire(...) //线程没拿到锁,进入CLH队列
第一步,ReentrantLock调用lock方法,首先调用initialTryLock尝试第一次加锁。
//ReentrantLock
public void lock() {
sync.lock();
}
//Sync
final void lock() {
if (!initialTryLock())
acquire(1);
}
//FairSync
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果没有已等待的线程且cas成功
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {//可重入逻辑
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
第二步,如果既不是第一个线程,又不是可重入线程,则继续往下调用AbstractQueuedSynchronizer.acquire(int arg),该方法会调用子类重写的tryAcquire方法尝试加锁。FairSync类是公平锁,只有当同步队列中没有等待的线程或者本线程是队首线程才会尝试cas status。如果tryAcquire返回false,则调用AbstractQueuedSynchronizer.acquire(Node node,int arg,boolean shared ...)方法将线程入队等待。
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
// FairSync
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//AbstractQueuedSynchronizer
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
//核心代码
if (node == null) { // allocate; retry before enqueue
//第一次for循环,创建节点
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) //第二次for循环,初始化同步队列
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;//第三次for,将new node入队
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {//第四次for循环,修改new node状态
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)//第五次for循环,阻塞线程
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
//AbstractQueuedSynchronizer
private void tryInitializeHead() {
Node h = new ExclusiveNode();
if (U.compareAndSetReference(this, HEAD, null, h))
tail = h;
}
AbstractQueuedSynchronizer.acquire会执行五次循环才能将节点入队并阻塞线程,每次for循环都会尝试tryAcquire。
//FairSync和NonFairSync释放锁的流程是一样的
ReentrantLock:unlock()
--> AbstractQueuedSynchronizer:release(int arg)
--> Sync:tryRelease(int arg)//如果本线程是持有锁的线程,那么修改state
--> AbstractQueuedSynchronizer:signalNext(Node h)//唤醒第一个线程
--> LockSupport:unpark() //调用Unsafe.unpark
release流程比较简单,不分公平锁与非公平锁,直接由Sync实现。释放锁会先减少status和去除exclusiveOwnerThread,然后再唤醒同步队列上的线程。
//ReentrantLock
public void unlock() {
sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
//Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
//AbstractQueuedSynchronizer
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
//LockSupport
public static void unpark(Thread thread) {
if (thread != null) {
if (thread.isVirtual()) {
VirtualThreads.unpark(thread);
} else {
U.unpark(thread); //Unsafe
}
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。