当多个线程在等待彼此释放持有的资源,从而形成了资源占有和等待的循环时,就产生了死锁。可能产生死锁的例子:
class Account {
private long amount;
void plus(long amount) { this.amount += amount; }
void minus(long amount) {
if (this.amount < amount)
throw new IllegalArgumentException();
else
this.amount -= amount;
}
static void transferWithDeadlock(long amount, Account first, Account second){
synchronized (first) {
synchronized (second) {
first.minus(amount);
second.plus(amount);
}
}
}
}
如果在同一时间发生死锁:
避免死锁的方法有:
class Account {
private long id;
private long amount;
// Some methods are omitted
static void transferWithLockOrdering(long amount, Account first, Account second){
boolean lockOnFirstAccountFirst = first.id < second.id;
Account firstLock = lockOnFirstAccountFirst ? first : second;
Account secondLock = lockOnFirstAccountFirst ? second : first;
synchronized (firstLock) {
synchronized (secondLock) {
first.minus(amount);
second.plus(amount);
}
}
}
}
class Account {
private long amount;
// Some methods are omitted
static void transferWithTimeout(
long amount, Account first, Account second, int retries, long timeoutMillis
) throws InterruptedException {
for (int attempt = 0; attempt < retries; attempt++) {
if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
{
try {
first.minus(amount);
second.plus(amount);
}
finally {
second.lock.unlock();
}
}
}
finally {
first.lock.unlock();
}
}
}
}
}
JVM可以检测到监视器死锁,并将在线程转储中打印死锁信息。
当线程花费所有时间协商对资源的访问或检测并避免死锁,从而没有线程真正取得进展时,就会发生活锁。 当线程长时间保持锁定而使某些线程“饥饿”而没有取得进展时,就会发生饥饿。
线程池的核心接口是ExecutorService
,java.util.concurrent
还提供了一个静态工厂Executors
,它包含创建具有最常见配置的线程池的工厂方法。
工厂方法如下:
方法 | 说明 |
---|---|
newSingleThreadExecutor | 返回一个只有一个线程的ExecutorService |
newFixedThreadPool | 返回一个具有固定数目线程的ExecutorService |
newCachedThreadPool | 返回一个可变大小的线程池ExecutorService |
newSingleThreadScheduledExecutor | 返回只有一个线程的ScheduledExecutorService |
newScheduledThreadPool | 返回包含一组线程的ScheduledExecutorService |
newWorkStealingPool | 返回一个带有并行级别的ExecutorService |
表6 静态工厂方法
当调整线程池大小时,最好基于机器运行该应用时分配的逻辑内核数。可以通过调用Runtime.getRuntime().availableProcessors()
来获得该值。
实现类 | 说明 |
---|---|
ThreadPoolExecutor | 默认实现,带有可选的线程大小调整池,单个工作队列和可配置的策略(用于拒绝任务)(通过RejectedExecutionHandler)和线程创建(通过ThreadFactory)。 |
ScheduledThreadPoolExecutor | ThreadPoolExecutor的扩展,可以创建定期任务 |
ForkJoinPool | 作业存储池:池中的所有线程都尝试查找并运行提交的任务或其他活动任务创建的任务。 |
表7 线城市的实现
任务通过ExecutorService#submit
,ExecutorService#invokeAll
或ExecutorService#invokeAny
提交,它们对不同类型的任务有多种重载。
任务的功能性接口:
接口 | 说明 |
---|---|
Runnable | 一个没有返回值的任务 |
Callable | 一个包含返回值的计算。它还声明可以抛出原始异常,所以不需要对检查异常进行包装 |
表8 任务功能接口
Future
是对所有的异步计算的抽象。它表示这些计算的结果,在某些时候可用。大多数的ExecutorService
方法都是用Future
作为返回值。它包含检查当前future的状态以及阻塞当前读取操作直至结果可以被读取等方法。
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
String result = future.get(1L, TimeUnit.SECONDS);
System.out.println("Result is '" + result + "'.");
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
catch (TimeoutException e) {
throw new RuntimeException(e);
}
assert future.isDone();
Lock
java.util.concurrent.locks
包中有一个标准的Lock
接口,ReentrantLock
实现复制了synchronized
关键字的功能,同时提供了一些额外的功能,比如获取当前锁状态的信息,非阻塞的tryBlock()
方法,以及可中断的锁。下面是使用具体的ReentrantLock
实例的例子:
class Counter {
private final Lock lock = new ReentrantLock();
private int value;
int increment() {
lock.lock();
try {
return ++value;
} finally {
lock.unlock();
}
}
}
java.util.concurrent.locks
包还包含了ReadWriteLock
接口(以及ReentrantReadWriteLock
实现),它被定义为一组读写锁,支持多个同步读者和单一写。
class Statistic {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private int value;
void increment() {
lock.writeLock().lock();
try {
value++;
} finally {
lock.writeLock().unlock();
}
}
int current() {
lock.readLock().lock();
try {
return value;
} finally {
lock.readLock().unlock();
}
}
}
CountDownLatch
CountDownLatch
通过一个数值初始化。线程会调用await()
方法阻塞自己,等待计数值为0后再继续运行。其它的线程(或是同一个线程)调用countDown()
来减少计数。一旦计数为0后,该倒计时器便不可以重复使用。用来在达到某个条件后,启动一组未知数量的线程
CompletableFuture
CompletableFuture
是异步计算的一个抽象。不同于Future
,只能通过阻塞获取结果,该类支持注册回调以创建在结果或异常可用时要执行的任务管道。 在创建过程中(通过CompletableFuture#supplyAsync / runAsync
)或在添加回调过程(*异步家族的方法)期间,都可以指定执行程序的执行者(如果未指定标准全局ForkJoinPool#commonPool
)。
注意,如果CompletableFuture
已完成,则通过非*async
方法注册的回调将在调用者的线程中执行。
如果有多个futures,则可以使用CompletableFuture#allOf
来获取所有futures都完成的future,或者使用CompletableFuture#anyOf
来完成任何future就可以完成。
ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
//Completed when both of the futures are completed
CompletableFuture<String> waitingForAll = CompletableFuture
.allOf(
CompletableFuture.supplyAsync(() -> "first"),
CompletableFuture.supplyAsync(() -> "second", executor1)
)
.thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
//Using same executor
.thenApply(result -> "Java " + result)
//Using different executor
.thenApplyAsync(result -> "Dzone " + result, executor1)
//Completed when this and other future are completed
.thenCombine(waitingForAll, (first, second) -> first + second)
//Implicitly using ForkJoinPool#commonPool as the executor
.thenAcceptAsync(result -> {
System.out.println("Result is '" + result + "'.");
})
//Generic handler
.whenComplete((ignored, exception) -> {
if (exception != null)
exception.printStackTrace();
});
//First blocking call - blocks until it is not finished.
future.join();
future
//Executes in the current thread (which is main).
.thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
//Implicitly using ForkJoinPool#commonPool as the executor
.thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
使集合成为线程安全的最简单方法是使用Collections#synchronized*
系列方法。 由于此解决方案在高并发下表现不佳,因此java.util.concurrent提供了针对并发使用进行了优化的各种数据结构。
实现 | 说明 |
---|---|
CopyOnWriteArrayList | 它提供了copy-on-write的语义,其中对数据结构的每次修改都会导致新的数据内部副本(因此,写入代价非常昂贵,而读取却很便宜)。 数据结构上的迭代器始终会在创建迭代器时看到其快照。 |
表9 Lists injava.util.concurrent
实现 | 说明 |
---|---|
ConcurrentHashMap | 它通常充当存储桶的哈希表。 读操作通常不会阻塞并反映最近完成的写操作的结果。只需将其CAS(compare-and-set)到存储区中即可,将第一个节点写入空容器中,而其他写入则需要锁(存储桶的第一个节点用作锁)。 |
ConcurrentSkipListMap | 它提供并发访问以及类似于TreeMap的排序map功能。 性能范围与TreeMap相似,只要它们不修改映射的同一部分,通常多个线程就可以在不争用的情况下从映射读取和写入。 |
Table 10: Maps injava.util.concurrent
实现 | 说明 |
---|---|
CopyOnWriteArraySet | 与CopyOnWriteArrayList相似,它使用写时复制语义来实现Set接口。 |
ConcurrentSkipListSet | 与ConcurrentSkipListMap相似,但是实现Set接口。 |
Table 11: Sets injava.util.concurrent
创建并发集的另一种方法是包装 concurrent map:
Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());
队列充当“生产者”和“消费者”之间的管道。 项目以相同的“先进先出”(FIFO)顺序放入管道的另一端并从管道的另一端出来。 BlockingQueue
接口扩展了Queue
,以提供有关如何处理队列可能已满(当生产者添加项目时)或为空(当消费者读取或删除项目时)的情况的其他选择。 在这些情况下,BlockingQueue
提供的方法将永远阻塞或在指定时间段内阻塞,等待条件由于另一个线程的动作而改变。链表支持的无界无阻塞队列
时间 | 说明 |
---|---|
ConcurrentLinkedQueue | 链表支持的无界无阻塞队列 |
LinkedBlockingQueue | 由链表支持的可选有界阻塞队列。 |
PriorityBlockingQueue | 最小堆支持的无限制阻塞队列。 将根据与队列关联的比较器的顺序(而不是FIFO顺序)从队列中删除项目。 |
DelayQueue | 无限制的元素队列,每个元素都有一个延迟值。 元素只有在延迟已过时才能删除,并且按照最旧的过期项目的顺序删除。 |
SynchronousQueue | 一个长度为0的队列,生产者和使用者在其中阻塞直到对方到达为止。 当两个线程都到达时,值直接从生产者转移到消费者。 在线程之间传输数据时很有用。 |
Table 12: Queues injava.util.concurrent