这次说说多线程只是扩展,主要讲解一些应用,应用带一些原理讲解,同时希望各位老铁有所收获,这些内容跟前面的线程和并发容器有关系的,从应用场景引出多线程技术栈里面的应用,其实锁和同步块,容器,工具类,都是非常的使用的。反射更加适应于语法。
多线程经常使用在逻辑处理里面,一个程序N个逻辑要做,一个用户请求可能需要数据库查询,第三方的系统接口,调用redis,一个用户请求需要多步组成,可以使用多线程技术来实现,去做一些调整,
接触后端开发的时候,经常发现一个请求过来,后端需要做一系列的复杂的操作,下面这个后端有:系统消息,我的团队,我的钱包 对于这些信息,后面的系统如何设计。这些可能涉及到多个模块的调用。一个系统划分为多个子系统来做。
互联网公司存在组织结构复杂,调用的模块比较多。设计这样系统的时候,一个信息单独的查询系统的对应接口,还是移动前端发起一次请求一下获取到。一般都做网关(API)接口,一个请求获取多个信息,网关收到信息后,获取多个子系统的接口,最后把信息汇总,返回给前端。
通过数据分析,越来越多的互联网电商平台的单子70%以上都来自手机端,手机端有个典型的应用,网络处理很麻烦的,移动设备的固有属性,一个人走这走这到了信号的盲区了,一个页面发起五六个接口的请求,移动互联网的应用造成了很大的损耗,一般都是一个接口获取全部的信息。 如果一个API网关需要调用3个接口,这3个接口是串行完成的,A执行完(3秒),执行B,B执行完(2秒),执行C(5秒),C执行完返回给移动端json字符串,需要10秒才能返回。 如果A,B,C这3个没有相互依赖的关系,完全可以把A交给线程1,B交给线程2,C交给线程3,来一起去完成,汇总执行的结果,需要5秒,没完成就返回。这样是不是效率明显得到了提升。
异步计算的结果,提供了用于检查计算是否完成,等待计算完成以及获取结果的方法。
尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。 如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true
如果任务在正常结束之前被被取消返回true
3.boolean isDone()
正常结束、异常或者被取消导致任务完成,将返回true
4.V get()
等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。
5.V get(long timeout, TimeUnit unit)
任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
多个人等一个信号后继续执行操作。例如5个运动员,等一个发令员的枪响。 一个人等多个人的信号。旅游团等所有人签到完成才开始出发。 常见到使用的地方是zk获取连接的时候。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class UserServiceCountLatch {
ExecutorService executorService = Executors.newCachedThreadPool();
@Autowired
private RestTemplate restTemplate;
/**
* 查询多个系统的数据,合并返回
*/
public Object getUserInfo(String userId) throws InterruptedException {
CountDownLatch count = new CountDownLatch(2);
ArrayList<JSONObject> values = new ArrayList<>();
// 你可以封装成一个 提交URL 就能自动多线程调用的 工具
executorService.submit(() -> {
// 1.业务代码
JSONObject userInfo = new JSONObject();
values.add(userInfo);
count.countDown();
});
executorService.submit(() -> {
// 2.业务代码
JSONObject intergralInfo= new JSONObject();
values.add(intergralInfo);
count.countDown();
});
count.await();// 等待计数器归零
// 3. 合并为一个json对象
JSONObject result = new JSONObject();
for (JSONObject value : values) {
result.putAll(value);
}
return result;
}
}
1.统计线程执行的情况 2.压力测试中,使用countDownLatch实现最大程度的并发处理。 2.多个线程之间,相互通信,比如线程异步调用完接口,结果通知。
CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。
坐车,老板都是票卖完了才开车。 数据库的批量操作,达到一定数量批量进行插入。
import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
// 循环屏障(栅栏),示例:数据库批量插入
// 游戏大厅... 5人组队打副本
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
// 任务1+2+3...1000 拆分为100个任务(1+..10, 11+20) -> 100线程去处理。
// 每当有4个线程处于await状态的时候,则会触发barrierAction执行
CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
@Override
public void run() {
// 这是每满足4次数据库操作,就触发一次批量执行
System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
for (int i = 0; i < 4; i++) {
System.out.println(sqls.poll());
}
}
});
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
sqls.add("data - " + Thread.currentThread()); // 缓存起来
Thread.sleep(1000L); // 模拟数据库操作耗时
barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
System.out.println(Thread.currentThread() + "插入完毕");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);
}
}
又称“信号量”,控制多个线程争抢许可。 acquire: 获取一个许可,如果没有就等待。 release: 释放一个许可。 availablePermits: 方法得到可用的许可数目。
代码并发处理限流 举个例子,去洗浴中心的时候都会给一个手环,这个手环很多时候就是为了限制熟练,因为柜子是有限的,每个人一个柜子,如果没有手环了就是没有柜子了,手环归还后柜子就出现了,基本就是这个原理。
import com.study.lock.aqs.NeteaseAqs;
// 自定义的信号量实现
public class NeteaseSemaphore {
NeteaseAqs aqs = new NeteaseAqs() {
@Override
public int tryAcquireShared() { // 信号量获取, 数量 - 1
for(;;) {
int count = getState().get();
int n = count - 1;
if(count <= 0 || n < 0) {
return -1;
}
if(getState().compareAndSet(count, n)) {
return 1;
}
}
}
@Override
public boolean tryReleaseShared() { // state + 1
return getState().incrementAndGet() >= 0;
}
};
/** 许可数量 */
public NeteaseSemaphore(int count) {
aqs.getState().set(count); // 设置资源的状态
}
public void acquire() {
aqs.acquireShared();
} // 获取令牌
public void release() {
aqs.releaseShared();
} // 释放令牌
}
import com.study.lock.aqs.AQSdemo;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
// 信号量机制
public class SemaphoreDemo {
public static void main(String[] args) {
SemaphoreDemo semaphoreTest = new SemaphoreDemo();
int N = 9; // 客人数量
NeteaseSemaphore semaphore = new NeteaseSemaphore(5); // 手牌数量,限制请求数量
for (int i = 0; i < N; i++) {
String vipNo = "vip-00" + i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取令牌
semaphoreTest.service(vipNo);
semaphore.release(); // 释放令牌
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
// 限流 控制5个线程 同时访问
public void service(String vipNo) throws InterruptedException {
System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
Thread.sleep(new Random().nextInt(3000));
System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
}
}
package com.study.lock.aqs;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
// 抽象队列同步器
// state, owner, waiters
public class NeteaseAqs {
// acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
// tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
// release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
// tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。
// 1、 如何判断一个资源的拥有者
public volatile AtomicReference<Thread> owner = new AtomicReference<>();
// 保存 正在等待的线程
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
// 记录资源状态
public volatile AtomicInteger state = new AtomicInteger(0);
// 共享资源占用的逻辑,返回资源的占用情况
public int tryAcquireShared(){
throw new UnsupportedOperationException();
}
public void acquireShared(){
boolean addQ = true;
while(tryAcquireShared() < 0) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryReleaseShared(){
throw new UnsupportedOperationException();
}
public void releaseShared(){
if (tryReleaseShared()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
// 独占资源相关的代码
public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
throw new UnsupportedOperationException();
}
public void acquire() {
boolean addQ = true;
while (!tryAcquire()) {
if (addQ) {
// 没拿到锁,加入到等待集合
waiters.offer(Thread.currentThread());
addQ = false;
} else {
// 阻塞 挂起当前的线程,不要继续往下跑了
LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
}
}
waiters.remove(Thread.currentThread()); // 把线程移除
}
public boolean tryRelease() {
throw new UnsupportedOperationException();
}
public void release() { // 定义了 释放资源之后要做的操作
if (tryRelease()) {
// 通知等待者
Iterator<Thread> iterator = waiters.iterator();
while (iterator.hasNext()) {
Thread next = iterator.next();
LockSupport.unpark(next); // 唤醒
}
}
}
public AtomicInteger getState() {
return state;
}
public void setState(AtomicInteger state) {
this.state = state;
}
}
PS:工具是根据场景来的,达到某个场景这个工具才有它的价值,如果你不存在这个场景这个工具也就没有价值。多线程这块设计到3块的知识:筑基阶段(JMM,lock,cas,atomic,sync),并发容器(。里面都涉及到数据结构,我已经开通了专辑数据结构与算法,数据结构并不是一两篇文章就可以搞定的东西,大学可是一门学科。),工具类阶段(多线程工具类阶段,设计模式的体现。不同的源码都有自己的设计模式的体现)