之前学习了如何使用synchronized关键字来实现同步访问,Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁。
虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。
不同于synchronized是Java语言的关键字,是内置特性,Lock不是Java语言内置的,Lock是一个类,通过这个类可以实现同步访问。而且synchronized同步块执行完成或者遇到异常是锁会自动释放,而lock必须调用unlock()方法释放锁,因此在finally块中释放锁。
先看看lock接口定义了哪些方法:
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
这里面lock()、tryLock()、tryLock(long time, TimeUnit unit)和lockInterruptibly()是用来获取锁的。这四个方法都是用来获取锁的,那有什么区别呢?
lock()方法是平常使用得最多的一个方法,就是用来获取锁。如果锁已被其他线程获取,则进行等待。
tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
tryLock(long time, TimeUnit unit)方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
lockInterruptibly()方法,当通过这个方法去获取锁时,如果线程正在等待获取锁,则这个线程能够响应中断,即中断线程的等待状态。也就使说,当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程。
unLock()方法是用来释放锁的,这没什么特别需要讲的。
Condition newCondition() 是用于获取与lock绑定的等待通知组件,当前线程必须获得了锁才能进行等待,进行等待时会先释放锁,当再次获取锁时才能从等待中返回。
Lock接口里面的方法我们已经知道,接下来实现Lock的类ReentrantLock开始学起,发现ReentrantLock并没有多少代码,另外有一个很明显的特点是:基本上所有的方法的实现实际上都是调用了其静态内存类Sync中的方法,而Sync类继承了 AbstractQueuedSynchronizer(AQS)。
AQS(以下简称同步器)是用来构建锁和其他同步组件的基础框架,它的实现主要依赖一个int成员变量来表示同步状态,通过内置的FIFO队列来完成排队工作。
子类通过继承并实现它的抽象方法来管理同步状态,通过使用getState,setState以及compareAndSetState这三个方法对同步状态进行更改。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态的获取和释放方法来供自定义同步组件的使用,同步器既支持独占式获取同步状态,也可以支持共享式获取同步状态,这样就可以方便的实现不同类型的同步组件。
同步器是实现锁的关键,要实现锁功能,子类继承Lock,它定义了使用者与锁交互的接口,就像上面那几个接口,但是实现却是通过同步器,同步器简化了锁的实现方式,实现了底层操作,如同步状态管理,线程的排队,等待和唤醒,而外面使用者去不用关心这些细节。
同步器的设计模式是基于模板方法,也就是说,使用者要继承同步器并重写指定的方法,随后将同步器组合在自定义同步器组合定义在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。总结就是同步器将一些方法开放给子类进行重写,而同步器给同步组件所提供模板方法又会重新调用被子类所重写的方法
如在AQS中有此方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
而ReentrantLock中重写了方法:
那在AQS中的acquire调用了这个方法,这就相当于在父类定义了一套模板,这些模板会调用一些可重写的方法,这些可重写的方法具体的实现放在了子类。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这就是模板方法方法的设计思路,如还有疑惑,可以去学习这种设计模式。
下面就是一些可以被重写的方法:
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
protected int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败 |
protected boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占 |
实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) | 在void acquireInterruptibly(int arg)的基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 在acquireSharedInterruptibly(int arg)基础上增加了超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection<Thread> getQueuedThreads() | 获取等待在同步队列上的线程集合 |
同步器提供的模板方法基本上分为3类:
下面看一个例子:
public class Mutex implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
这个例子中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。
用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
volatile int waitStatus //节点状态
volatile Node prev //当前节点/线程的前驱节点
volatile Node next; //当前节点/线程的后继节点
volatile Thread thread;//加入同步队列的线程引用
Node nextWaiter;//等待队列中的下一个节点
看到节点的数据结构,知道这是一个双向队列,而在AQS中还存在两个成员变量:
private transient volatile Node head;
private transient volatile Node tail;
AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下:
通过对源码的理解以及做实验的方式,现在我们可以清楚的知道这样几点:
重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。如果一个锁不支持可重入,那当一个线程调用它的lock()方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己所阻塞。
synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁。ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例
核心方法为nonfairTryAcquire:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//1. 如果该锁未被任何线程占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//2.若被占有,检查占有线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 3. 再次获取,计数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值。
protected final boolean tryRelease(int releases) {
//1. 同步状态减1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//2. 只有当同步状态为0时,锁成功被释放,返回true
free = true;
setExclusiveOwnerThread(null);
}
// 3. 锁未被完全释放,返回false
setState(c);
return free;
}
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
公平锁和非公平锁。何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO,ReentrantLock的构造方法无参时是构造非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
另外还提供了另外一种方式,可传入一个boolean值,true时为公平锁,false时为非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
这段代码的逻辑与nonfairTryAcquire基本上一直,唯一的不同在于增加了hasQueuedPredecessors的逻辑判断,方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁。
公平锁 VS 非公平锁
之前学到的锁都是独占锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是ReentrantReadWriteLock。
读写锁主要有以下三个特性:
ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法,而其实现——ReentrantReadWriteLock,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,主要有:
int getReadLockCount()//返回当前读锁被获取的次数。该次数不等于获取读锁的线程数,如果一个线程连续获取n次,那么返回的就是n
int getReadHoldCount()//返回当前线程获取读锁的次数
boolean isWriteLocked()//判断写锁是否被获取
int getWriteHoldCount()//返回当前写锁被获取的次数
读写锁使用:
public class Cache {
static Map<String, Object> map = new HashMap<>();
static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
static Lock r = reentrantReadWriteLock.readLock();
static Lock w = reentrantReadWriteLock.writeLock();
// 获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}
Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。
写操作put(String key,Object value)方法和clear()方法,在更新HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。
再分析下读写锁的实现原理,主要的内容包括:读写状态的设计,写锁的获取与释放,读锁的获取与释放以及锁降级。
读写状态的设计
读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写,如图:
写锁的获取与释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 1. 获取写锁当前的同步状态
int c = getState();
// 2. 获取写锁获取的次数
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 3.1 当读锁已被读线程获取或者当前线程不是已经获取写锁的线程的话
// 当前线程获取写锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 3.2 当前线程获取写锁,支持可重复加锁
setState(c + acquires);
return true;
}
// 3.3 写锁未被任何线程获取,当前线程可获取写锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//1. 同步状态减去写状态
int nextc = getState() - releases;
//2. 当前写状态是否为0,为0则释放写锁
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
//3. 不为0则更新同步状态
setState(nextc);
return free;
}
读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。
如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。另外由于要增加一些外部功能,比如getReadHoldCount()方法,作用是返回当前线程获取读锁的次数。
读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal中,由线程自身维护,这使获取读锁的实现变得复杂。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//1. 如果写锁已经被获取并且获取写锁的线程不是当前线程的话,当前
// 线程获取读锁失败返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
//2. 当前线程获取读锁
compareAndSetState(c, c + SHARED_UNIT)) {
//3. 下面的代码主要是新增的一些功能,比如getReadHoldCount()方法
//返回当前获取读锁的次数
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//4. 处理在第二步中CAS操作失败的自旋已经实现重入性
return fullTryAcquireShared(current);
}
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的 值是(1<<16)。
锁降级
锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程。
接下来看一个锁降级的示例。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作:
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}
当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问processData()方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的lock()方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。