并发编程系列之什么是并发协同?
多个线程并发,协作来完成一件任务的过程。因为任务处理的需要,需控制某些线程等待另外一些线程执行完成任务的某些部分,然后继续执行。
jdk的juc包中除提供了用于专门处理1并发协同的工具类,主要有CountDownLatch、CyclicBarrier、Phaser、Semaphore
CountDownLatch注意事项:只可使用一次,不能重复使用,计数变为0之后,就不可再用
countDownLatch(N)
这个多个条件可以是:等待N个线程、等待N个操作、等待某操作的N次执行例子:等待n个线程执行完成,再一起执行
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
final CountDownLatch cdl = new CountDownLatch(1);
int concurrency = 100;
final Random random = new Random();
for (int i = 0; i < concurrency; i++) {
new Thread(()->{
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "准备就绪");
// 让并发线程都等待发出信号
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "开始工作");
}).start();
}
System.out.println("******************** 发出开始信号***********");
cdl.countDown();
}
}
执行,发现结果不符合我们的要求,虽然也是多个线程等待,再一起无序执行:
******************** 发出开始信号***********
Thread-22准备就绪
Thread-22开始工作
Thread-45准备就绪
Thread-45开始工作
...
因为CountDownLatch不能重用,所以再新加一个CountDownLatch协同N个线程:
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class StartTogerCountdownLatchExample {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
int concurrency = 100;
final CountDownLatch cdln = new CountDownLatch(concurrency);
final Random random = new Random();
for (int i = 0;i < concurrency; i++) {
new Thread(()->{
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 准备就绪");
// 调用countDown()报告完成任务
cdln.countDown();
// 让所有线程都等待发出信号
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 开始工作");
}).start();
}
//等待准备完成
cdln.await();
System.out.println("******************** 发出开始信号***********");
cdl.countDown();
}
}
等待N个线程准备就绪,然后一个总的CountDownLatch发出信号量,所有线程一起执行
...
Thread-11 准备就绪
Thread-14 准备就绪
Thread-53 准备就绪
Thread-91 准备就绪
******************** 发出开始信号***********
Thread-97 开始工作
Thread-57 开始工作
...
CyclicBarrier(int parties)
:parties指定有多少个部分(线程)参与,称之为参与数。CyclicBarrier(int parties,Runnable barrierAction)
:barrierAction,所有参与者都到达屏障时执行一次的命令。在一组线程中最后一个线程到达之后(但在释放所有线程之前),在该线程中执行改命令,该命令只在每个屏障点运行一次。若要在继续执行所有线程之前更新共享状态,此屏障操作很有用。int await() throws InterruptedException,BrowkenBarrierException
:线程执行过程会调用await()方法,表明自己已经到达屏障,该线程自己阻塞,等待其它线程也到达屏障;当所有线程都到达屏障,也即线程等待数等于参与数,则释放所有线程,让它们继续执行。返回值int表示到达当前线程的索引号,注意索引号是从parties-1
开始减为0。BrokenBarrierException
,屏障被破坏异常,当调用await时,或等待过程中屏障被破坏,则会抛出BrokenBarrierException
。int await(long timeout,TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException
:等待指定时长,如到了时间还不能释放,则将抛出TimeoutException
int getNumberWaiting()
: 获取当前在屏障处的线程数boolean isBroken()
: 判断屏障是否被破坏void reset()
:重置屏障为初始化状态。如果当前有线程正在等待,则这些线程将被释放并抛出BrokenBarrierException
案例:公司组织周末旅游活动,大家各自从家出发到公司集合,大家都到了之后,出发到公司各自游玩,然后在公园门口集合,再去餐厅就餐,大家都到了就开始用餐。使用并非编程模拟场景。
参与者不变,多次彼此等待。正好可用CyclicBarrier的循环使用特性
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int concurrency = 100;
final CyclicBarrier cyclicBarrier = new CyclicBarrier(concurrency , ()->{
System.out.println("*****************准备完成!************");
});
final Random random = new Random();
for (int i = 0 ; i < concurrency; i++) {
new Thread(() -> {
try {
Thread.sleep(random.nextInt(10_000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "准备就绪");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(
Thread.currentThread().getName() + " 开始工作....");
}).start();
}
}
}
控制台打印:
...
Thread-12准备就绪
Thread-58准备就绪
Thread-75准备就绪
Thread-25准备就绪
*****************准备完成!************
Thread-25 开始工作....
Thread-89 开始工作....
Thread-34 开始工作....
...
jdk7中增加了一个用于多阶段同步控制的工具类,它包含了CyclicBarrier和CountDownLatch的相关功能,比它们更强大灵活。
对Phaser阶段协同器的理解,Phaser适用于多个线程协作的任务,分为多个阶段,每个阶段都可以有任意个参与者,线程可以随时注册并参与某个阶段;当一个阶段中所有任务都成功完成后,Phaser的onAdvance()被调用,然后Phaser释放等待线程,自动进入下个阶段。如此循环,直到Phaser不再包含任何参与者。
Phaser API说明:
Phaser()
:参与任务数0Phaser(int parties)
:指定初始参与任务数Phaser(Phaser parent)
:指定parent阶段器, 子对象作为一个整体加入parent对象,当子对象中没有参与者时,会自动从parent对象解除注册Phaser(Phaser parent , int parties)
:集成上面两个方法的int register()
:增加一个数,返回当前阶段号int bulkRegister(int parties)
:增加指定个数,返回当前阶段号int arriveAndDeregister()
:减少一个任务数,返回当前阶段号int arrive()
:到达,任务完成,返回当前阶段号int arriveAndAwaitAdvance()
:到达后等待其他任务到达,返回到达阶段号int awaitAdvance(int phase)
:在指定阶段等待(必须是当前阶段才有效)int awaitAdvanceInterruptibly(int phase)
int awaitAdvanceInterruptibly(int phase , long timeout, TimeUnit unit)
protected boolean onAdvance(int Phase , int registeredParties)
:类似于CyclicBarrier的触发命令,通过重写该方法来增加阶段到达动作void forceTermination()
:强制结束boolean isTerMinated()
:判断是否结束void getPhase()
:获取当前阶段号import java.util.Random;
import java.util.concurrent.Phaser;
public class MultipleStartTogetherPhserDemo {
Random rd = new Random();
int bound = 5000;
public void step1Task() throws InterruptedException {
// 经过一段时间后,到达公司
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达公司!");
}
public void step2Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去公园玩。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "完成公园游玩!");
}
public void step3Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "出发去餐厅。。。。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "到达餐厅!");
}
public void step4Task() throws InterruptedException {
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "开始用餐。。。。。。");
// 玩了一段时间后,到公园门口集合
Thread.sleep(rd.nextInt(bound));
System.out.println(
"员工【" + Thread.currentThread().getName() + "】" + "回家了!");
}
public static void main(String[] args) {
// 创建阶段协同器对象,重写了onAdvance方法,增加阶段到达处理逻辑
final Phaser ph = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
int staffs = registeredParties - 1;
switch (phase) {
case 0:
System.out.println("大家都到公司了,出发去公园!人数:" + staffs);
break;
case 1:
System.out.println("大家都到公园大门,出发去餐厅!人数:" + staffs);
break;
case 2:
System.out.println("大家都到餐厅了,开始用餐!人数:" + staffs);
break;
}
// 判断是否只剩主线程一个参与者,是,则返回true,阶段协同器终止。
return registeredParties == 1;
}
};
// 增加一个任务数,用来让主线程全程参与
ph.register();
final MultipleStartTogetherPhserDemo job = new MultipleStartTogetherPhserDemo();
// 让3个全程参与的线程加入
for (int i = 0; i < 3; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "到达公园大门集合。");
ph.arriveAndAwaitAdvance();
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// 让两个不参加聚餐的员工加入
for (int i = 0; i < 2; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step1Task();
ph.arriveAndAwaitAdvance();
job.step2Task();
System.out.println(
"员工【" + Thread.currentThread().getName() + "】"
+ "回家了!");
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
while (!ph.isTerminated()) {
int phaser = ph.arriveAndAwaitAdvance();
if (phaser == 2) { // 到了去餐厅的阶段,让只参加晚上聚餐的人加入
for (int i = 0; i < 4; i++) {
// 增加参与任务数
ph.register();
new Thread(new Runnable() {
@Override
public void run() {
try {
job.step3Task();
ph.arriveAndAwaitAdvance();
job.step4Task();
// 完成了,注销离开
ph.arriveAndDeregister();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
}
}