程序员控制并发流程
线程协作、控制并发流程,我们开发,咋滴搞呢
在不控制的情况下,并发下的多个线程,受线程调度器控制,不受我们程序员控制
控制并发流程的工具类,让程序员可以通过工具类控制线程合作,完成业务逻辑
比如,让线程A等待线程B执行完,再执行
countDownLatch门闩
控制并发流程
countDown是倒数,Latch是门闩
倒数门闩
比如,拼多多,人满才发货,就是这个思想
流程:开始--进入等待---倒数---继续工作
CountDownLatch(int count)
一个构造函数,count是用来倒数的值
await:调用await方法的线程会被挂起,等待直到count为0才会执行
countDown,就是count--,为0时,线程会被重新唤起
图解认识下,
现在看代码演示
用法一:
一个线程等待多个线程都执行完毕,再继续自己工作
这个场景是一等多的场景,比如去医院看病,检查完一项,盖一个章,盖好了再去主治医生进行下一步,或者拼多多,需要等其他人操作完,比如5个人拼团,你得等其他4个人执行countDown,才能拼单成功
/**
* 工厂中,质检,5个人检查完才算通过
*/
public class CountDownLatchDemo1 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i+1;
Runnable runnable = () -> {
try {
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + no + "完成了检查");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
latch.countDown();
}
};
service.submit(runnable);
}
System.out.println("等待5个人检查完.........");
latch.await();
System.out.println("所有人都检查完了");
}
}
用法2:
多等一的场景,比如多个线程,要同时启动,等待一个人发令,类似于百米赛跑
真实场景:比如并发模拟的时候,压测,需要让线程都等着,同时的执行任务,模拟高并发场景,也会用到countDownLatch
/**
* @Author:Joseph
* 百米赛跑等射枪
*/
public class CounDownLatchDemo2 {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i+1;
Runnable runnable = ()->{
System.out.println("No,"+no+"准备完毕,等待法令枪");
try {
latch.await();
System.out.println("No,"+no+"开始跑步!");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.submit(runnable);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开枪了!!!!!!");
latch.countDown();
}
}
puls玩法,重点裁判记录最后一个人跑完
/**
* @Author:Joseph
* 百米赛跑等射枪
* 终点等待最后一个结束
*/
public class CounDownLatchDemo3 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
ExecutorService service = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int no = i+1;
Runnable runnable = ()->{
System.out.println("No,"+no+"准备完毕,等待法令枪");
try {
begin.await();
System.out.println("No,"+no+"开始跑步!");
Thread.sleep((long) (Math.random()*10000));
System.out.println("No,"+no+"到终点了");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
service.submit(runnable);
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开枪了!!!!!!");
begin.countDown();
end.await();
System.out.println("所有人都到了,裁判宣布比赛结束");
}
}
countDownLatch是不能够重用的,结束了就失效了,无法进行复用
只能新new一个,或者用CyclicBarrier
构造方法,count代表倒数几次,
await等待,countDown倒数
用法:一等多,或者多等一,经典用法
注意不能回滚重置
Semaphore不仅再java中有,操作系统os也是有的,很多人肯定学过!
java来模拟了一下这个而已
用途:限制或者管理有限资源的适用情况,
可以类比一个许可证,而且这个许可证是有限的,拿到许可证才能用这个任务
实际应用场景:面向B短的服务,一次查询数据超级 大,很容易拖垮线程,设计信号量为3,那么同时最多三个线程拿到这个资源进行适用。
比如:澡堂,一次只能容纳10个人,那么信号量的许可证设置为10,进去一个人,就-1,出来一个人就+1
代码层面
acquire获取许可证,release归还许可证
1:初始化Semphore并指定许可证数量
2:执行任务之前,调用acquire()方法
3:用完只会,调用release释放许可证
new Semaphore(int permits,boolean fair)
fair,ture是公平,不能插队,只能在等待对列里等待,
false的话,可以在一定情况下插队
acquire()
acquireUninterruptibly()
不响应中断,不会有异常来让这个线程处理
tryAcquire()
尝试获取许可证,不必一直等待,陷入阻塞,过一会儿在去查看许可证的空闲情况
tryAcqure(timeout)
等待几秒,获取不到,就去做别的事儿 ,与tryAcqure的区别就是会等几秒再走
release
归还许可证
public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(3,true);
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(50);
Runnable runnable = ()->{
try {
//可以获取多个许可证来分配权重,但是注意获取和释放许可证数量一样
semaphore.acquire(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"拿到了许可证");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"释放了许可证");
semaphore.release(3);
};
for (int i = 0; i < 100; i++) {
service.submit(runnable);
}
service.shutdown();
}
}
主要就是await,发起等待,等待其他线程的signal,这个和操作系统的semphore的方法名一样啊哈哈,学过操作系统的肯定直到,只不过这个,没有计数,
signalAll是唤醒全部的线程,
/**
* condition基本用法
* 特点:绑定在锁上面
*/
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void methon1() throws InterruptedException {
lock.lock();
try {
System.out.println("条件不满足,开始await");
condition.await();
System.out.println("条件满足了,执行后续任务");
}finally {
lock.unlock();
}
}
void method2(){
lock.lock();
try {
System.out.println("准备工作完成,开始唤醒其他的线程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
//注意,在主线程中,不能await再signal,已经阻塞了,不能再通过自己唤醒,所有new一个线程做
new Thread(()->{
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//注意,需要主线程启动到子线程这里之后再进行await,不然阻塞在上面,无法执行到创建线程这里
conditionDemo1.methon1();
}
}
**
* 演示用Condition,实现生产者消费者模式
*/
public class ConditionDemo2 {
/**
* 生产消费模型必备,对列
*/
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
while (true){
lock.lock();
try {
while (queue.size()==0){
System.out.println("对列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从对列中取走了一个数据,对列剩余"+queue.size()+"个元素");
}finally {
lock.unlock();
}
}
}
}class Producer extends Thread{
@Override
public void run() {
producer();
}
private void producer() {
while (true){
lock.lock();
try {
while (queue.size()==queueSize){
System.out.println("对列满,不能再发数据");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signalAll();
System.out.println("从对列中插入了一个数据,对列剩余"+queue.size()+"个元素");
}finally {
lock.unlock();
}
}
}
}
}
Lock代替synchronzed,那么Conditon就是代替Object的wait,notify的,对应await signal
用法,和性质,几乎一样
await方法会自动释放持有的Lock锁,和Obect.wait,不 需要手动的释放锁,调用等待的方法的适合,就会释放锁,然后进入阻塞
所以两者的等待唤醒方法,调用的时候,必须持有锁,否则会抛出异常
conditon 优点是: condition可以更灵活的去做等待。
CyclicBarrier和CountDownLatch类似,都可以实现不同的任务等待,然后同时运行,
CyclicBarrier与countDownLatch区别,
另外,cyclicBarrier到条件之后,可以再执行一个线程任务
看例子
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人都到了,统一出发");
}
});
//cyclicBarrier相比与countDownLatch是可以重复用的,这里5改为10,相当于用了2次,可以自己改一下
for (int i = 0; i < 5; i++) {
new Thread(new Task(i,cyclicBarrier)).start();
}
}
static class Task implements Runnable{
private int id;
private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+id+"现在前往集合地点");
try {
Thread.sleep((long) (Math.random()*10000));
System.out.println("线程"+id+"到了集合地点,等待其他人到达");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
互相等待的时候,countDownLatch门闩很好用,想要复用工具的话,可以使用CyclicBarrier循环栅栏,
两者区别是countDownLatch扣减次数是以事件维度扣减,而CyclicBarrier是以线程扣减
涉及资源的有限分配,就可以用Semphore信号量,通过分配许可证来限制并发的数量
Condition条件对象是和lock配合的,通过这个可以实现线程的阻塞,和Object的wait、notify很像,
都是锁与阻塞配合。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。