Java给我们提供了很多通用且好用的并发工具类,现在我就总结一下
Atomic包原子更新基本类型的工具类:
AtomicBoolean:以原子更新的方式更新boolean; AtomicInteger:以原子更新的方式更新Integer;
AtomicLong:以原子更新的方式更新Long;
这几个类的用法基本一致,这里以AtomicInteger为例总结常用的方法
addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果; incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果; getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;
getAndIncrement():以原子的方式将实例中的原值加1,返回的是自增前的旧值;
其他它的底层是运用了CAS的原理,具体还各位可以去看源码,比如我就举一个例子
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
atomic包下提供能原子更新数组中元素的类有:
AtomicIntegerArray:原子更新整型数组中的元素; AtomicLongArray:原子更新长整型数组中的元素; AtomicReferenceArray:原子更新引用类型数组中的元素
这几个类的用法一致,就以AtomicIntegerArray来总结下常用的方法:
addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加; getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1; compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新
如果需要原子更新引用类型变量的话,为了保证线程安全,atomic也提供了相关的类:
AtomicReference:原子更新引用类型; AtomicReferenceFieldUpdater:原子更新引用类型里的字段;
AtomicMarkableReference:原子更新带有标记位的引用类型;
这几个类的使用方法也是基本一样的,以AtomicReference为例,来说明这些类的基本用法。
如果需要更新对象的某个字段,并在多线程的情况下,能够保证线程安全,atomic同样也提供了相应的原子操作类:
AtomicIntegeFieldUpdater:原子更新整型字段类; AtomicLongFieldUpdater:原子更新长整型字段类; AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号。 而为什么在更新的时候会带有版本号,是为了解决CAS的ABA问题;
要想使用原子更新字段需要两步操作:
原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性; 更新类的属性必须使用public volatile进行修饰;
CountDownLatch是一个非常实用的多线程控制工具类。常用下面几个方法:
CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数
countDown() // 计数减一
await() //等待,当计数减到0时,所有线程并行执行
CountDownLatch在我工作的多个场景被使用,算是用的很频繁的了,比如我们的API接口响应时间被要求在200ms以内,但是如果一个接口内部依赖多个三方外部服务,那串行调用接口的RT必然很久,所以个人用的最多的是接口RT优化场景,内部服务并行调用。
对于倒计数器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检测。只有等到所有的检查完毕后,引擎才能点火。那么在检测环节当然是多个检测项可以同时进行的。代码实现:
public class CountDownLatchDemo implements Runnable{
static final CountDownLatch latch = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
// 模拟检查任务
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//计数减一
//放在finally避免任务执行过程出现异常,导致countDown()不能被执行
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i=0; i<10; i++){
exec.submit(demo);
}
// 等待检查
latch.await();
// 发射火箭
System.out.println("Fire!");
// 关闭线程池
exec.shutdown();
}
}
上述代码中我们先生成了一个CountDownLatch实例。计数数量为10,这表示需要有10个线程来完成任务,等待在CountDownLatch上的线程才能继续执行。latch.countDown();
方法作用是通知CountDownLatch有一个线程已经准备完毕,倒计数器可以减一了。latch.await()方法要求主线程等待所有10个检查任务全部准备好才一起并行执行。
其实,有点类似人满发车这个词来理解CyclicBarrier的作用:
长途汽车站提供长途客运服务。当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。
CyclicBarrier常用于多线程分组计算。
下面来看下CyclicBarrier的主要方法:
//等到所有的线程都到达指定的临界点
await() throws InterruptedException, BrokenBarrierException
//与上面的await方法功能基本一致,只不过这里有超时限制,阻塞等待直至到达超时时间为止
await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException
//获取当前有多少个线程阻塞等待在临界点上
int getNumberWaiting()
//用于查询阻塞等待的线程是否被中断
boolean isBroken()
//将屏障重置为初始状态。如果当前有线程正在临界点等待的话,将抛出BrokenBarrierException。
void reset()
另外需要注意的是,CyclicBarrier提供了这样的构造方法:
public CyclicBarrier(int parties, Runnable barrierAction)
下面用一个简单的例子来看下CyclicBarrier的用法,模拟下上面的坐车的例子。
public class CyclicBarrierDemo {
//指定必须有20个乘客到达才行
private static CyclicBarrier barrier = new CyclicBarrier(20, () -> {
System.out.println("所有乘客都上车了,司机开车!!!!!");
});
public static void main(String[] args) {
System.out.println("乘客准备上车...........");
ExecutorService service = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 乘客,上车");
barrier.await();
System.out.println(Thread.currentThread().getName() + " 乘客,上车");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}
这两者还是各有不同侧重点的:
1、CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。
2、调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;
3、CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;
4、CountDownLatch是不能复用的,而CyclicLatch是可以复用的。
Semaphore叫信号量,Semaphore有两个目的,第一个是多个共享资源互斥使用,第二个是并发线程数的控制。
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有10个,这时候就需要使用Semaphore来控制能够并发访问到数据库连接资源的线程个数最多只有10个。
//获取许可,如果无法获取到,则阻塞等待直至能够获取为止
void acquire() throws InterruptedException
//同acquire方法功能基本一样,只不过该方法可以一次获取多个许可
void acquire(int permits) throws InterruptedException
//释放许可
void release()
//释放指定个数的许可
void release(int permits)
//尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false
boolean tryAcquire()
//与tryAcquire方法一致,只不过这里可以指定获取多个许可
boolean tryAcquire(int permits)
//尝试获取许可,如果能够立即获取到或者在指定时间内能够获取到,则返回true,否则返回false
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
//与上一个方法一致,只不过这里能够获取多个许可
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
//返回当前可用的许可证个数
int availablePermits()
//返回正在等待获取许可证的线程数
int getQueueLength()
//是否有线程正在等待获取许可证
boolean hasQueuedThreads()
//获取所有正在等待许可的线程集合
Collection<Thread> getQueuedThreads()
/**
* Semaphore叫信号量 or 信号灯
* Semaphore有两个目的,第一个目的是多个共享资源互斥使用,第二目的是并发线程数的控制
*/
public class SemaphoreDemo {
public static void main(String[] args) {
// 模拟厕所10个茅坑
Semaphore semaphore = new Semaphore(5);
for (int i = 1; i <= 10; i++) {
new Thread(() -> {
try {
// 获取锁资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "\t上厕所");
// 模拟人上厕所10秒,然后让出坑位
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t上完厕所,让出坑位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放锁资源
semaphore.release();
}
}, "" + i + "号帅哥").start();
}
}
}
Exchanger是一个用于线程间协作的工具类,用于两个线程间能够交换。它提供了一个交换的同步点,在这个同步点两个线程能够交换数据。
具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。
Exchanger除了一个无参的构造方法外,主要方法也很简单:
//当一个线程执行该方法的时候,会等待另一个线程也执行该方法,因此两个线程就都达到了同步点
//将数据交换给另一个线程,同时返回获取的数据
V exchange(V x) throws InterruptedException
//同上一个方法功能基本一样,只不过这个方法同步等待的时候,增加了超时时间
V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException
模拟这样一个情景,在青春洋溢的中学时代,下课期间,男生经常会给走廊里为自己喜欢的女孩子送情书,相信大家都做过这样的事情吧 :)。男孩会先到女孩教室门口,然后等女孩出来,教室那里就是一个同步点,然后彼此交换信物,也就是彼此交换了数据。现在,就来模拟这个情景。
public class ExchangerDemo {
private static Exchanger<String> exchanger = new Exchanger();
public static void main(String[] args) {
//代表男生和女生
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(() -> {
try {
//男生对女生说的话
String girl = exchanger.exchange("我其实暗恋你很久了......");
System.out.println("女孩儿说:" + girl);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.execute(() -> {
try {
System.out.println("女生慢慢的从教室你走出来......");
TimeUnit.SECONDS.sleep(3);
//男生对女生说的话
String boy = exchanger.exchange("我也很喜欢你......");
System.out.println("男孩儿说:" + boy);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
输出结果:
女生慢慢的从教室你走出来......
男孩儿说:我其实暗恋你很久了......
女孩儿说:我也很喜欢你......
暂时总结到这里,如果还有什么遗漏的,可以留言给我哈。
https://www.relaxheart.cn/to/blog/streamline?uuid=83 https://juejin.im/post/5aeec3ebf265da0ba76fa327