前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java多线程相关知识点扩展实例分析

Java多线程相关知识点扩展实例分析

作者头像
IT架构圈
发布2020-10-27 10:19:24
4480
发布2020-10-27 10:19:24
举报
文章被收录于专栏:IT架构圈

这次说说多线程只是扩展,主要讲解一些应用,应用带一些原理讲解,同时希望各位老铁有所收获,这些内容跟前面的线程和并发容器有关系的,从应用场景引出多线程技术栈里面的应用,其实锁和同步块,容器,工具类,都是非常的使用的。反射更加适应于语法。

(一)多线程应用
  • ① 介绍

多线程经常使用在逻辑处理里面,一个程序N个逻辑要做,一个用户请求可能需要数据库查询,第三方的系统接口,调用redis,一个用户请求需要多步组成,可以使用多线程技术来实现,去做一些调整,

  • ② 经典场景

接触后端开发的时候,经常发现一个请求过来,后端需要做一系列的复杂的操作,下面这个后端有:系统消息,我的团队,我的钱包 对于这些信息,后面的系统如何设计。这些可能涉及到多个模块的调用。一个系统划分为多个子系统来做。

  • ③ 后端接口执行-大概流程

互联网公司存在组织结构复杂,调用的模块比较多。设计这样系统的时候,一个信息单独的查询系统的对应接口,还是移动前端发起一次请求一下获取到。一般都做网关(API)接口,一个请求获取多个信息,网关收到信息后,获取多个子系统的接口,最后把信息汇总,返回给前端。

  1. 收到一个请求。
  2. 调用多个服务接口获取其他系统的数据信息。
  3. 最后汇总范围。

通过数据分析,越来越多的互联网电商平台的单子70%以上都来自手机端,手机端有个典型的应用,网络处理很麻烦的,移动设备的固有属性,一个人走这走这到了信号的盲区了,一个页面发起五六个接口的请求,移动互联网的应用造成了很大的损耗,一般都是一个接口获取全部的信息。 如果一个API网关需要调用3个接口,这3个接口是串行完成的,A执行完(3秒),执行B,B执行完(2秒),执行C(5秒),C执行完返回给移动端json字符串,需要10秒才能返回。 如果A,B,C这3个没有相互依赖的关系,完全可以把A交给线程1,B交给线程2,C交给线程3,来一起去完成,汇总执行的结果,需要5秒,没完成就返回。这样是不是效率明显得到了提升。

(二)Future
  • ① 介绍

异步计算的结果,提供了用于检查计算是否完成,等待计算完成以及获取结果的方法。

  • ② 接口的定义
  1. boolean cancel(boolean mayInterruptIfRunning)

尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。 如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true

  1. boolean isCancelled()

如果任务在正常结束之前被被取消返回true

3.boolean isDone()

正常结束、异常或者被取消导致任务完成,将返回true

4.V get()

等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。

5.V get(long timeout, TimeUnit unit)

任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。

(三)CountDownLatch
  • ① 介绍

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

  • ② 常见用法

多个人等一个信号后继续执行操作。例如5个运动员,等一个发令员的枪响。 一个人等多个人的信号。旅游团等所有人签到完成才开始出发。 常见到使用的地方是zk获取连接的时候。

  • ③ 源码分析
代码语言:javascript
复制
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class UserServiceCountLatch {
    ExecutorService executorService = Executors.newCachedThreadPool();

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(2);
        ArrayList<JSONObject> values = new ArrayList<>();
        // 你可以封装成一个 提交URL 就能自动多线程调用的 工具
            executorService.submit(() -> {
                // 1.业务代码
                JSONObject userInfo = new JSONObject();
                values.add(userInfo);
                count.countDown();
            });
            executorService.submit(() -> {
               // 2.业务代码
               JSONObject intergralInfo= new JSONObject();
                values.add(intergralInfo);
                count.countDown();
        });

        count.await();// 等待计数器归零

        // 3. 合并为一个json对象
        JSONObject result = new JSONObject();
        for (JSONObject value : values) {
            result.putAll(value);
        }
        return result;
    }
}

1.统计线程执行的情况 2.压力测试中,使用countDownLatch实现最大程度的并发处理。 2.多个线程之间,相互通信,比如线程异步调用完接口,结果通知。

(四)CyclicBarrier
  • ① 介绍

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

  • ② 场景

坐车,老板都是票卖完了才开车。 数据库的批量操作,达到一定数量批量进行插入。

  • ③ 源码
代码语言:javascript
复制
import java.util.ArrayList;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

// 循环屏障(栅栏),示例:数据库批量插入
// 游戏大厅... 5人组队打副本
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> sqls = new LinkedBlockingQueue<>();
        // 任务1+2+3...1000  拆分为100个任务(1+..10,  11+20) -> 100线程去处理。

        // 每当有4个线程处于await状态的时候,则会触发barrierAction执行
        CyclicBarrier barrier = new CyclicBarrier(4, new Runnable() {
            @Override
            public void run() {
                // 这是每满足4次数据库操作,就触发一次批量执行
                System.out.println("有4个线程执行了,开始批量插入: " + Thread.currentThread());
                for (int i = 0; i < 4; i++) {
                    System.out.println(sqls.poll());
                }
            }
        });

        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    sqls.add("data - " + Thread.currentThread()); // 缓存起来
                    Thread.sleep(1000L); // 模拟数据库操作耗时
                    barrier.await(); // 等待栅栏打开,有4个线程都执行到这段代码的时候,才会继续往下执行
                    System.out.println(Thread.currentThread() + "插入完毕");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }

        Thread.sleep(2000);
    }
}
(五)Semaphore
  • ①介绍

又称“信号量”,控制多个线程争抢许可。 acquire: 获取一个许可,如果没有就等待。 release: 释放一个许可。 availablePermits: 方法得到可用的许可数目。

  • ② 场景

代码并发处理限流 举个例子,去洗浴中心的时候都会给一个手环,这个手环很多时候就是为了限制熟练,因为柜子是有限的,每个人一个柜子,如果没有手环了就是没有柜子了,手环归还后柜子就出现了,基本就是这个原理。

  • ③ 源码
代码语言:javascript
复制
import com.study.lock.aqs.NeteaseAqs;

// 自定义的信号量实现
public class NeteaseSemaphore {
    NeteaseAqs aqs = new NeteaseAqs() {
        @Override
        public int tryAcquireShared() { // 信号量获取, 数量 - 1
            for(;;) {
                int count =  getState().get();
                int n = count - 1;
                if(count <= 0 || n < 0) {
                    return -1;
                }
                if(getState().compareAndSet(count, n)) {
                    return 1;
                }
            }
        }

        @Override
        public boolean tryReleaseShared() { // state + 1
            return getState().incrementAndGet() >= 0;
        }
    };

    /** 许可数量 */
    public NeteaseSemaphore(int count) {
        aqs.getState().set(count); // 设置资源的状态
    }

    public void acquire() {
        aqs.acquireShared();
    } // 获取令牌

    public void release() {
        aqs.releaseShared();
    } // 释放令牌
}
代码语言:javascript
复制
import com.study.lock.aqs.AQSdemo;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

// 信号量机制
public class SemaphoreDemo {
    public static void main(String[] args) {
        SemaphoreDemo semaphoreTest = new SemaphoreDemo();
        int N = 9;            // 客人数量
        NeteaseSemaphore semaphore = new NeteaseSemaphore(5); // 手牌数量,限制请求数量
        for (int i = 0; i < N; i++) {
            String vipNo = "vip-00" + i;
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取令牌

                    semaphoreTest.service(vipNo);

                    semaphore.release(); // 释放令牌
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 限流 控制5个线程 同时访问
    public void service(String vipNo) throws InterruptedException {
        System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
        Thread.sleep(new Random().nextInt(3000));
        System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
    }

}
代码语言:javascript
复制
package com.study.lock.aqs;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

// 抽象队列同步器
// state, owner, waiters
public class NeteaseAqs {
    // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
    // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
    // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
    // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。

    // 1、 如何判断一个资源的拥有者
    public volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 保存 正在等待的线程
    public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    // 记录资源状态
    public volatile AtomicInteger state = new AtomicInteger(0);

    // 共享资源占用的逻辑,返回资源的占用情况
    public int tryAcquireShared(){
        throw new UnsupportedOperationException();
    }

    public void acquireShared(){
        boolean addQ = true;
        while(tryAcquireShared() < 0) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryReleaseShared(){
        throw new UnsupportedOperationException();
    }

    public void releaseShared(){
        if (tryReleaseShared()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    // 独占资源相关的代码

    public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean addQ = true;
        while (!tryAcquire()) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() { // 定义了 释放资源之后要做的操作
        if (tryRelease()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }
}

PS:工具是根据场景来的,达到某个场景这个工具才有它的价值,如果你不存在这个场景这个工具也就没有价值。多线程这块设计到3块的知识:筑基阶段(JMM,lock,cas,atomic,sync),并发容器(。里面都涉及到数据结构,我已经开通了专辑数据结构与算法,数据结构并不是一两篇文章就可以搞定的东西,大学可是一门学科。),工具类阶段(多线程工具类阶段,设计模式的体现。不同的源码都有自己的设计模式的体现)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 编程坑太多 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (一)多线程应用
  • (二)Future
  • (三)CountDownLatch
  • (四)CyclicBarrier
  • (五)Semaphore
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档