本文源码基于: JDK13
JUC是Java提供的一个并发工具包,提供了很多并发工具.
本文主要将AQS.
java.util.concurrent.locks.AbstractQueuedSynchronizer.
是一个基类,也可以理解为一个框架.
它提供了对于同步状态的控制,以前线程等待时的FIFO队列.
AQS的属性.
/**
* The synchronization state.
*/
private volatile int state;
核心属性,同步状态. 使用volatile
修饰.
与之对应的三个方法:
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState(){
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState){
state=newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect,int update){
return STATE.compareAndSet(this,expect,update);
}
分别提供了get/set方法及CAS的赋值方法.
等待队列队头.
等待队列队尾.
head 和 tail 是java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
的实例, 构成了一个双向链表.
Node是为了表达一个等待线程而抽象的数据结构,主要有以下几个属性.
// Node节点所在的等待状态
volatile int waitStatus;
//前置节点
volatile Node prev;
// 后置节点
volatile Node next;
// 在这个节点上的线程
volatile Thread thread;
// 下一个等待的节点
Node nextWaiter;
他有两种模式,分别为共享模式及独占模式. 对应不同的操作.
其中waitStatus
为枚举值,有以下几个值.
/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED=1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL=-1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION=-2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate.
*/
static final int PROPAGATE=-3;
AQS的方法可太多了. 先看一下对外提供的API方法.
众所周知,AQS是为了同步(加锁)而设计的. 那么一定是有获取锁,释放锁的方法的.先从这里切入.
独占式应用,典型的就是ReentrantLock
. ReentrantLock源码学习
独占模式的加锁代码.
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
selfInterrupt();
}
独占模式的获取锁, 并且忽略中断. 至少调用一次tryAcquire
.如果成功了就返回.
否则的话将线程加入等待队列,重复的进行tryAcquire
. 直到成功为止.
这个方法在AQS中是抽象的, protected修饰. 由子类具体进行实现.
它定义的:
独占模式的获取锁, 如果可以获取到,返回成功,如果获取失败,线程应该被放入等待队列.
如果线程已经在等待队列中, 应该是被其他线程唤醒了.
总之: 这个方法是非阻塞的,立即返回的,要么成功加锁,返回true. 要么加锁失败,返回flase.
,之后的操作就不归这个方法管了.
private方法,给当前线程创建一个Node并且放入等待队列.
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode){
Node node=new Node(mode);
for(;;){
Node oldTail=tail;
if(oldTail!=null){
node.setPrevRelaxed(oldTail);
if(compareAndSetTail(oldTail,node)){
oldTail.next=node;
return node;
}
}else{
initializeSyncQueue();
}
}
}
一个final方法,子类无法重写.
将等待队列中的所有线程,进行获取锁的行为.
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node,int arg){
boolean interrupted=false;
try{
for(;;){
final Node p=node.predecessor();
if(p==head&&tryAcquire(arg)){
setHead(node);
p.next=null; // help GC
return interrupted;
}
if(shouldParkAfterFailedAcquire(p,node))
interrupted|=parkAndCheckInterrupt();
}
}catch(Throwable t){
cancelAcquire(node);
if(interrupted)
selfInterrupt();
throw t;
}
}
如果当前节点的前置节点是头结点,说明当前节点是优先级最高的那个.尝试获取锁.
如果当前节点不是优先级最高的,或者获取锁失败了. 调用shouldParkAfterFailedAcquire
.
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
int ws=pred.waitStatus;
if(ws==Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if(ws>0){
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do{
node.prev=pred=pred.prev;
}while(pred.waitStatus>0);
pred.next=node;
}else{
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws,Node.SIGNAL);
}
return false;
}
如果前置节点是SIGNAL.说明前置节点优先级更高,当前线程应该park.
如果前置节点被取消了,扔掉中间的取消节点. 不park.
如果前置节点是其他状态,设置为SIGNAL. 优先级最高. 不park.
不park的原因是再来一次. 检测一遍.
如果当前线程需要被park.则park且检查下是否中断了.
/**
* Convenience method to park and then check if interrupted.
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
如果发生异常,则取消掉这次获取锁.
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(Node node){
// Ignore if node doesn't exist
if(node==null)
return;
node.thread=null;
// Skip cancelled predecessors
Node pred=node.prev;
while(pred.waitStatus>0)
node.prev=pred=pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary, although with
// a possibility that a cancelled node may transiently remain
// reachable.
Node predNext=pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus=Node.CANCELLED;
// If we are the tail, remove ourselves.
if(node==tail&&compareAndSetTail(node,pred)){
pred.compareAndSetNext(predNext,null);
}else{
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if(pred!=head&&
((ws=pred.waitStatus)==Node.SIGNAL||
(ws<=0&&pred.compareAndSetWaitStatus(ws,Node.SIGNAL)))&&
pred.thread!=null){
Node next=node.next;
if(next!=null&&next.waitStatus<=0)
pred.compareAndSetNext(predNext,next);
}else{
unparkSuccessor(node);
}
node.next=node; // help GC
}
}
独占式的解锁.
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg){
if(tryRelease(arg)){
Node h=head;
if(h!=null&&h.waitStatus!=0)
unparkSuccessor(h);
return true;
}
return false;
}
调用tryRelease(int arg)
. 如果解锁成功,唤醒头结点的后继节点. 如果解锁失败, 返回false.
解锁操作,由子类负责具体实现,可以后期针对ReentrantLock学习.
这个方法,非阻塞式, 即时返回true/false. 代表是否释放成功.
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node){
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws=node.waitStatus;
if(ws< 0)
node.compareAndSetWaitStatus(ws,0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s=node.next;
if(s==null||s.waitStatus>0){
s=null;
for(Node p=tail;p!=node&&p!=null;p=p.prev)
if(p.waitStatus<=0)
s=p;
}
if(s!=null)
LockSupport.unpark(s.thread);
}
在等待队列中,从后向前找到正序的第一个需要唤醒的Node. 执行unpark操作.
共享锁的相关实现,可以查看CountDownLatch
的相关代码. CountDownLatch源码解析
共享模式的获取锁.忽略中断.
至少调用一次TryAcquireShared
, 如果成功就返回,失败就将线程加入等待队列. 重复调用TryAcquireShared
知道成功.
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg){
if(tryAcquireShared(arg)< 0)
doAcquireShared(arg);
}
抽象方法,由子类负责实现.
如果获取锁成功,直接返回. 如果获取失败,线程加入等待队列,如果线程已经加入,等待被其他人释放锁的动作唤醒.
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg){
final Node node=addWaiter(Node.SHARED);
boolean interrupted=false;
try{
for(;;){
final Node p=node.predecessor();
if(p==head){
int r=tryAcquireShared(arg);
if(r>=0){
setHeadAndPropagate(node,r);
p.next=null; // help GC
return;
}
}
if(shouldParkAfterFailedAcquire(p,node))
interrupted|=parkAndCheckInterrupt();
}
}catch(Throwable t){
cancelAcquire(node);
throw t;
}finally{
if(interrupted)
selfInterrupt();
}
}
shouldParkAfterFailedAcquire
判断是否需要进行park. 如果需要,则park当前线程并检查中断.共享模式的释放锁.
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg){
if(tryReleaseShared(arg)){
doReleaseShared();
return true;
}
return false;
}
非阻塞式的释放锁.调用tryReleaseShared
.
如果释放成功,调用doReleaseShared
.如果失败,返回false.
抽象方法,具体由子类进行实现.
非阻塞式的,返回释放的结果.
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared(){
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for(;;){
Node h=head;
if(h!=null&&h!=tail){
int ws=h.waitStatus;
if(ws==Node.SIGNAL){
if(!h.compareAndSetWaitStatus(Node.SIGNAL,0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if(ws==0&&
!h.compareAndSetWaitStatus(0,Node.PROPAGATE))
continue; // loop on failed CAS
}
if(h==head) // loop if head changed
break;
}
}
共享模式的释放锁操作. 通知后继者并且确保传播.
独占式的解锁,只需要唤醒下一个即可。而共享式的解锁,需要广播解锁消息.
遍历等待队列,将SIGNAL的节点继任者全部唤醒.
完.
完。