前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(翻译)理解并发的核心概念二

(翻译)理解并发的核心概念二

作者头像
日薪月亿
发布2020-06-22 15:23:32
4190
发布2020-06-22 15:23:32
举报
文章被收录于专栏:技术探索

Liveness

死锁

当多个线程在等待彼此释放持有的资源,从而形成了资源占有和等待的循环时,就产生了死锁。可能产生死锁的例子:

代码语言:javascript
复制
class Account {
  private long amount;
  void plus(long amount) { this.amount += amount; }
  void minus(long amount) {
    if (this.amount < amount)
      throw new IllegalArgumentException();
    else
      this.amount -= amount;
  }
  static void transferWithDeadlock(long amount, Account first, Account second){
    synchronized (first) {
      synchronized (second) {
        first.minus(amount);
        second.plus(amount);
      }
    }
  }
}

如果在同一时间发生死锁:

  • 一个线程正在尝试从第一个帐户转移到第二个帐户,并且已经获得了第一个帐户的锁定。
  • 与此同时,另一个线程正在尝试从第二个帐户转移到第一个帐户,并且已经获得了第二个帐户的锁定。

避免死锁的方法有:

  • 顺序加锁 - 总是按相同的顺序获得锁
代码语言:javascript
复制
class Account {
  private long id;
  private long amount;
  // Some methods are omitted
  static void transferWithLockOrdering(long amount, Account first, Account second){
    boolean lockOnFirstAccountFirst = first.id < second.id;
    Account firstLock = lockOnFirstAccountFirst  ? first  : second;
    Account secondLock = lockOnFirstAccountFirst ? second : first;
    synchronized (firstLock) {
      synchronized (secondLock) {
        first.minus(amount);
        second.plus(amount);
      }
    }
  }
}
  • 带超时的锁 - 不要在获得锁定后无限期阻塞,而是释放所有锁定并重试。
代码语言:javascript
复制
class Account {
  private long amount;
  // Some methods are omitted
  static void transferWithTimeout(
      long amount, Account first, Account second, int retries, long timeoutMillis
  ) throws InterruptedException {
    for (int attempt = 0; attempt < retries; attempt++) {
      if (first.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
      {
        try {
          if (second.lock.tryLock(timeoutMillis, TimeUnit.MILLISECONDS))
          {
            try {
              first.minus(amount);
              second.plus(amount);
            }
            finally {
              second.lock.unlock();
            }
          }
        }
        finally {
          first.lock.unlock();
        }
      }
    }
  }
}

JVM可以检测到监视器死锁,并将在线程转储中打印死锁信息。

活锁和线程饥饿

当线程花费所有时间协商对资源的访问或检测并避免死锁,从而没有线程真正取得进展时,就会发生活锁。 当线程长时间保持锁定而使某些线程“饥饿”而没有取得进展时,就会发生饥饿。

java.util.concurrent

线程池(Thread pools)

线程池的核心接口是ExecutorServicejava.util.concurrent还提供了一个静态工厂Executors,它包含创建具有最常见配置的线程池的工厂方法。

工厂方法如下:

方法

说明

newSingleThreadExecutor

返回一个只有一个线程的ExecutorService

newFixedThreadPool

返回一个具有固定数目线程的ExecutorService

newCachedThreadPool

返回一个可变大小的线程池ExecutorService

newSingleThreadScheduledExecutor

返回只有一个线程的ScheduledExecutorService

newScheduledThreadPool

返回包含一组线程的ScheduledExecutorService

newWorkStealingPool

返回一个带有并行级别的ExecutorService

表6 静态工厂方法

当调整线程池大小时,最好基于机器运行该应用时分配的逻辑内核数。可以通过调用Runtime.getRuntime().availableProcessors()来获得该值。

实现类

说明

ThreadPoolExecutor

默认实现,带有可选的线程大小调整池,单个工作队列和可配置的策略(用于拒绝任务)(通过RejectedExecutionHandler)和线程创建(通过ThreadFactory)。

ScheduledThreadPoolExecutor

ThreadPoolExecutor的扩展,可以创建定期任务

ForkJoinPool

作业存储池:池中的所有线程都尝试查找并运行提交的任务或其他活动任务创建的任务。

表7 线城市的实现

任务通过ExecutorService#submitExecutorService#invokeAllExecutorService#invokeAny提交,它们对不同类型的任务有多种重载。

任务的功能性接口:

接口

说明

Runnable

一个没有返回值的任务

Callable

一个包含返回值的计算。它还声明可以抛出原始异常,所以不需要对检查异常进行包装

表8 任务功能接口

Future

Future是对所有的异步计算的抽象。它表示这些计算的结果,在某些时候可用。大多数的ExecutorService方法都是用Future作为返回值。它包含检查当前future的状态以及阻塞当前读取操作直至结果可以被读取等方法。

代码语言:javascript
复制
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "result");
try {
  String result = future.get(1L, TimeUnit.SECONDS);
  System.out.println("Result is '" + result + "'.");
} 
catch (InterruptedException e) {
  Thread.currentThread().interrupt();
  throw new RuntimeException(e);
} 
catch (ExecutionException e) {
  throw new RuntimeException(e.getCause());
} 
catch (TimeoutException e) {
  throw new RuntimeException(e);
}
assert future.isDone();

Locks

Lock java.util.concurrent.locks包中有一个标准的Lock接口,ReentrantLock实现复制了synchronized关键字的功能,同时提供了一些额外的功能,比如获取当前锁状态的信息,非阻塞的tryBlock()方法,以及可中断的锁。下面是使用具体的ReentrantLock实例的例子:

代码语言:javascript
复制
class Counter {
  private final Lock lock = new ReentrantLock();
  private int value;
  int increment() {
    lock.lock();
    try {
      return ++value;
    } finally {
      lock.unlock();
    }
  }
}

ReadWriteLock(读写锁)

java.util.concurrent.locks包还包含了ReadWriteLock接口(以及ReentrantReadWriteLock实现),它被定义为一组读写锁,支持多个同步读者和单一写。

代码语言:javascript
复制
class Statistic {
  private final ReadWriteLock lock = new ReentrantReadWriteLock();
  private int value;
  void increment() {
    lock.writeLock().lock();
    try {
      value++;
    } finally {
      lock.writeLock().unlock();
    }
  }
  int current() {
    lock.readLock().lock();
    try {
      return value;
    } finally {
      lock.readLock().unlock();
    }
  }
}

CountDownLatch CountDownLatch通过一个数值初始化。线程会调用await()方法阻塞自己,等待计数值为0后再继续运行。其它的线程(或是同一个线程)调用countDown()来减少计数。一旦计数为0后,该倒计时器便不可以重复使用。用来在达到某个条件后,启动一组未知数量的线程

CompletableFuture CompletableFuture是异步计算的一个抽象。不同于Future,只能通过阻塞获取结果,该类支持注册回调以创建在结果或异常可用时要执行的任务管道。 在创建过程中(通过CompletableFuture#supplyAsync / runAsync)或在添加回调过程(*异步家族的方法)期间,都可以指定执行程序的执行者(如果未指定标准全局ForkJoinPool#commonPool)。

注意,如果CompletableFuture已完成,则通过非*async方法注册的回调将在调用者的线程中执行。

如果有多个futures,则可以使用CompletableFuture#allOf来获取所有futures都完成的future,或者使用CompletableFuture#anyOf来完成任何future就可以完成。

代码语言:javascript
复制
ExecutorService executor0 = Executors.newWorkStealingPool();
ExecutorService executor1 = Executors.newWorkStealingPool();
//Completed when both of the futures are completed
CompletableFuture<String> waitingForAll = CompletableFuture
    .allOf(
        CompletableFuture.supplyAsync(() -> "first"),
        CompletableFuture.supplyAsync(() -> "second", executor1)
    )
    .thenApply(ignored -> " is completed.");
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Concurrency Refcard", executor0)
    //Using same executor
    .thenApply(result -> "Java " + result)
    //Using different executor
    .thenApplyAsync(result -> "Dzone " + result, executor1)
    //Completed when this and other future are completed
    .thenCombine(waitingForAll, (first, second) -> first + second)
    //Implicitly using ForkJoinPool#commonPool as the executor
    .thenAcceptAsync(result -> {
      System.out.println("Result is '" + result + "'.");
    })
    //Generic handler
    .whenComplete((ignored, exception) -> {
      if (exception != null)
        exception.printStackTrace();
    });
//First blocking call - blocks until it is not finished.
future.join();
future
    //Executes in the current thread (which is main).
    .thenRun(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))
    //Implicitly using ForkJoinPool#commonPool as the executor
    .thenRunAsync(() -> System.out.println("Current thread is '" + Thread.currentThread().getName() + "'."))

Concurrent collections

使集合成为线程安全的最简单方法是使用Collections#synchronized*系列方法。 由于此解决方案在高并发下表现不佳,因此java.util.concurrent提供了针对并发使用进行了优化的各种数据结构。

Lists

实现

说明

CopyOnWriteArrayList

它提供了copy-on-write的语义,其中对数据结构的每次修改都会导致新的数据内部副本(因此,写入代价非常昂贵,而读取却很便宜)。 数据结构上的迭代器始终会在创建迭代器时看到其快照。

表9 Lists injava.util.concurrent

Maps

实现

说明

ConcurrentHashMap

它通常充当存储桶的哈希表。 读操作通常不会阻塞并反映最近完成的写操作的结果。只需将其CAS(compare-and-set)到存储区中即可,将第一个节点写入空容器中,而其他写入则需要锁(存储桶的第一个节点用作锁)。

ConcurrentSkipListMap

它提供并发访问以及类似于TreeMap的排序map功能。 性能范围与TreeMap相似,只要它们不修改映射的同一部分,通常多个线程就可以在不争用的情况下从映射读取和写入。

Table 10: Maps injava.util.concurrent

Sets

实现

说明

CopyOnWriteArraySet

与CopyOnWriteArrayList相似,它使用写时复制语义来实现Set接口。

ConcurrentSkipListSet

与ConcurrentSkipListMap相似,但是实现Set接口。

Table 11: Sets injava.util.concurrent

创建并发集的另一种方法是包装 concurrent map:

代码语言:javascript
复制
Set<T> concurrentSet = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>());

Queues

​ 队列充当“生产者”和“消费者”之间的管道。 项目以相同的“先进先出”(FIFO)顺序放入管道的另一端并从管道的另一端出来。 BlockingQueue接口扩展了Queue,以提供有关如何处理队列可能已满(当生产者添加项目时)或为空(当消费者读取或删除项目时)的情况的其他选择。 在这些情况下,BlockingQueue提供的方法将永远阻塞或在指定时间段内阻塞,等待条件由于另一个线程的动作而改变。链表支持的无界无阻塞队列

时间

说明

ConcurrentLinkedQueue

链表支持的无界无阻塞队列

LinkedBlockingQueue

由链表支持的可选有界阻塞队列。

PriorityBlockingQueue

最小堆支持的无限制阻塞队列。 将根据与队列关联的比较器的顺序(而不是FIFO顺序)从队列中删除项目。

DelayQueue

无限制的元素队列,每个元素都有一个延迟值。 元素只有在延迟已过时才能删除,并且按照最旧的过期项目的顺序删除。

SynchronousQueue

一个长度为0的队列,生产者和使用者在其中阻塞直到对方到达为止。 当两个线程都到达时,值直接从生产者转移到消费者。 在线程之间传输数据时很有用。

Table 12: Queues injava.util.concurrent

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Liveness
  • 死锁
  • 活锁和线程饥饿
  • java.util.concurrent
    • 线程池(Thread pools)
      • ReadWriteLock(读写锁)
      • Lists
      • Maps
      • Sets
      • Queues
  • Future
  • Locks
  • Concurrent collections
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档