前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java的并发工具类汇总

Java的并发工具类汇总

作者头像
用户3467126
发布2019-08-09 14:12:34
4480
发布2019-08-09 14:12:34
举报
文章被收录于专栏:爱编码

前言

Java给我们提供了很多通用且好用的并发工具类,现在我就总结一下

Atomic包

更新基本类型变量

Atomic包原子更新基本类型的工具类:

AtomicBoolean:以原子更新的方式更新boolean; AtomicInteger:以原子更新的方式更新Integer;

AtomicLong:以原子更新的方式更新Long;

这几个类的用法基本一致,这里以AtomicInteger为例总结常用的方法

addAndGet(int delta) :以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果; incrementAndGet() :以原子的方式将实例中的原值进行加1操作,并返回最终相加后的结果; getAndSet(int newValue):将实例中的值更新为新值,并返回旧值;

getAndIncrement():以原子的方式将实例中的原值加1,返回的是自增前的旧值;

其他它的底层是运用了CAS的原理,具体还各位可以去看源码,比如我就举一个例子

代码语言:javascript
复制
    public final int getAndIncrement() {

        return unsafe.getAndAddInt(this, valueOffset, 1);

    }


    private static final Unsafe unsafe = Unsafe.getUnsafe();
更新数组类型变量

atomic包下提供能原子更新数组中元素的类有:

AtomicIntegerArray:原子更新整型数组中的元素; AtomicLongArray:原子更新长整型数组中的元素; AtomicReferenceArray:原子更新引用类型数组中的元素

这几个类的用法一致,就以AtomicIntegerArray来总结下常用的方法:

addAndGet(int i, int delta):以原子更新的方式将数组中索引为i的元素与输入值相加; getAndIncrement(int i):以原子更新的方式将数组中索引为i的元素自增加1; compareAndSet(int i, int expect, int update):将数组中索引为i的位置的元素进行更新

更新引用类型变量

如果需要原子更新引用类型变量的话,为了保证线程安全,atomic也提供了相关的类:

AtomicReference:原子更新引用类型; AtomicReferenceFieldUpdater:原子更新引用类型里的字段;

AtomicMarkableReference:原子更新带有标记位的引用类型;

这几个类的使用方法也是基本一样的,以AtomicReference为例,来说明这些类的基本用法。

更新对象的某个字段

如果需要更新对象的某个字段,并在多线程的情况下,能够保证线程安全,atomic同样也提供了相应的原子操作类:

AtomicIntegeFieldUpdater:原子更新整型字段类; AtomicLongFieldUpdater:原子更新长整型字段类; AtomicStampedReference:原子更新引用类型,这种更新方式会带有版本号。 而为什么在更新的时候会带有版本号,是为了解决CAS的ABA问题;

要想使用原子更新字段需要两步操作:

原子更新字段类都是抽象类,只能通过静态方法newUpdater来创建一个更新器,并且需要设置想要更新的类和属性; 更新类的属性必须使用public volatile进行修饰;

CountDownLatch

CountDownLatch是一个非常实用的多线程控制工具类。常用下面几个方法:

代码语言:javascript
复制
    CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数

    countDown() // 计数减一

    await() //等待,当计数减到0时,所有线程并行执行

CountDownLatch在我工作的多个场景被使用,算是用的很频繁的了,比如我们的API接口响应时间被要求在200ms以内,但是如果一个接口内部依赖多个三方外部服务,那串行调用接口的RT必然很久,所以个人用的最多的是接口RT优化场景,内部服务并行调用。

对于倒计数器,一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检测。只有等到所有的检查完毕后,引擎才能点火。那么在检测环节当然是多个检测项可以同时进行的。代码实现:

代码语言:javascript
复制
    public class CountDownLatchDemo implements Runnable{


        static final CountDownLatch latch = new CountDownLatch(10);

        static final CountDownLatchDemo demo = new CountDownLatchDemo();


        @Override

        public void run() {

            // 模拟检查任务

            try {

                Thread.sleep(new Random().nextInt(10) * 1000);

                System.out.println("check complete");

            } catch (InterruptedException e) {

                e.printStackTrace();

            } finally {

                //计数减一

                //放在finally避免任务执行过程出现异常,导致countDown()不能被执行

                latch.countDown();

            }

        }



        public static void main(String[] args) throws InterruptedException {

            ExecutorService exec = Executors.newFixedThreadPool(10);

            for (int i=0; i<10; i++){

                exec.submit(demo);

            }


            // 等待检查

            latch.await();


            // 发射火箭

            System.out.println("Fire!");

            // 关闭线程池

            exec.shutdown();

        }

    }

上述代码中我们先生成了一个CountDownLatch实例。计数数量为10,这表示需要有10个线程来完成任务,等待在CountDownLatch上的线程才能继续执行。latch.countDown();

方法作用是通知CountDownLatch有一个线程已经准备完毕,倒计数器可以减一了。latch.await()方法要求主线程等待所有10个检查任务全部准备好才一起并行执行。

CyclicBarrier

其实,有点类似人满发车这个词来理解CyclicBarrier的作用:

长途汽车站提供长途客运服务。当等待坐车的乘客到达20人时,汽车站就会发出一辆长途汽车,让这20个乘客上车走人。等到下次等待的乘客又到达20人是,汽车站就会又发出一辆长途汽车。

CyclicBarrier常用于多线程分组计算

下面来看下CyclicBarrier的主要方法:

代码语言:javascript
复制
    //等到所有的线程都到达指定的临界点

    await() throws InterruptedException, BrokenBarrierException 


    //与上面的await方法功能基本一致,只不过这里有超时限制,阻塞等待直至到达超时时间为止

    await(long timeout, TimeUnit unit) throws InterruptedException, 

    BrokenBarrierException, TimeoutException 


    //获取当前有多少个线程阻塞等待在临界点上

    int getNumberWaiting()


    //用于查询阻塞等待的线程是否被中断

    boolean isBroken()



    //将屏障重置为初始状态。如果当前有线程正在临界点等待的话,将抛出BrokenBarrierException。

    void reset()

另外需要注意的是,CyclicBarrier提供了这样的构造方法:

代码语言:javascript
复制
public CyclicBarrier(int parties, Runnable barrierAction)

下面用一个简单的例子来看下CyclicBarrier的用法,模拟下上面的坐车的例子。

代码语言:javascript
复制
    public class CyclicBarrierDemo {

        //指定必须有20个乘客到达才行

        private static CyclicBarrier barrier = new CyclicBarrier(20, () -> {

            System.out.println("所有乘客都上车了,司机开车!!!!!");

        });

        public static void main(String[] args) {

            System.out.println("乘客准备上车...........");


            ExecutorService service = Executors.newFixedThreadPool(6);

            for (int i = 0; i < 6; i++) {

                service.execute(() -> {

                    try {

                        System.out.println(Thread.currentThread().getName() + " 乘客,上车");

                        barrier.await();

                        System.out.println(Thread.currentThread().getName() + "  乘客,上车");

                    } catch (InterruptedException e) {

                        e.printStackTrace();

                    } catch (BrokenBarrierException e) {

                        e.printStackTrace();

                    }

                });

            }

        }


    }

CountDownLatch与CyclicBarrier比较

这两者还是各有不同侧重点的:

1、CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成,再携手共进。

2、调用CountDownLatch的countDown方法后,当前线程并不会阻塞,会继续往下执行;而调用CyclicBarrier的await方法,会阻塞当前线程,直到CyclicBarrier指定的线程全部都到达了指定点的时候,才能继续往下执行;

3、CountDownLatch方法比较少,操作比较简单,而CyclicBarrier提供的方法更多,比如能够通过getNumberWaiting(),isBroken()这些方法获取当前多个线程的状态,并且CyclicBarrier的构造方法可以传入barrierAction,指定当所有线程都到达时执行的业务功能;

4、CountDownLatch是不能复用的,而CyclicLatch是可以复用的

Semaphore

Semaphore叫信号量,Semaphore有两个目的,第一个是多个共享资源互斥使用,第二个是并发线程数的控制。

Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。假如有多个线程读取数据后,需要将数据保存在数据库中,而可用的最大数据库连接只有10个,这时候就需要使用Semaphore来控制能够并发访问到数据库连接资源的线程个数最多只有10个。

Semaphore的主要方法:
代码语言:javascript
复制
    //获取许可,如果无法获取到,则阻塞等待直至能够获取为止

    void acquire() throws InterruptedException 


    //同acquire方法功能基本一样,只不过该方法可以一次获取多个许可

    void acquire(int permits) throws InterruptedException


    //释放许可

    void release()


    //释放指定个数的许可

    void release(int permits)


    //尝试获取许可,如果能够获取成功则立即返回true,否则,则返回false

    boolean tryAcquire()


    //与tryAcquire方法一致,只不过这里可以指定获取多个许可

    boolean tryAcquire(int permits)


    //尝试获取许可,如果能够立即获取到或者在指定时间内能够获取到,则返回true,否则返回false

    boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException


    //与上一个方法一致,只不过这里能够获取多个许可

    boolean tryAcquire(int permits, long timeout, TimeUnit unit)


    //返回当前可用的许可证个数

    int availablePermits()


    //返回正在等待获取许可证的线程数

    int getQueueLength()


    //是否有线程正在等待获取许可证

    boolean hasQueuedThreads()


    //获取所有正在等待许可的线程集合

    Collection<Thread> getQueuedThreads()
Semaphore信号量demo
代码语言:javascript
复制
/**
 * Semaphore叫信号量 or 信号灯
 * Semaphore有两个目的,第一个目的是多个共享资源互斥使用,第二目的是并发线程数的控制
 */
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 模拟厕所10个茅坑
        Semaphore semaphore = new Semaphore(5);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                try {
                    // 获取锁资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "\t上厕所");
                    // 模拟人上厕所10秒,然后让出坑位
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println(Thread.currentThread().getName() + "\t上完厕所,让出坑位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 释放锁资源
                    semaphore.release();
                }
            }, "" + i + "号帅哥").start();
        }
    }
}

线程间交换数据Exchanger

Exchanger是一个用于线程间协作的工具类,用于两个线程间能够交换。它提供了一个交换的同步点,在这个同步点两个线程能够交换数据。

具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。

Exchanger除了一个无参的构造方法外,主要方法也很简单:

代码语言:javascript
复制
    //当一个线程执行该方法的时候,会等待另一个线程也执行该方法,因此两个线程就都达到了同步点

    //将数据交换给另一个线程,同时返回获取的数据

    V exchange(V x) throws InterruptedException


    //同上一个方法功能基本一样,只不过这个方法同步等待的时候,增加了超时时间

    V exchange(V x, long timeout, TimeUnit unit)

        throws InterruptedException, TimeoutException 

模拟这样一个情景,在青春洋溢的中学时代,下课期间,男生经常会给走廊里为自己喜欢的女孩子送情书,相信大家都做过这样的事情吧 :)。男孩会先到女孩教室门口,然后等女孩出来,教室那里就是一个同步点,然后彼此交换信物,也就是彼此交换了数据。现在,就来模拟这个情景。

代码语言:javascript
复制
    public class ExchangerDemo {

        private static Exchanger<String> exchanger = new Exchanger();


        public static void main(String[] args) {


            //代表男生和女生

            ExecutorService service = Executors.newFixedThreadPool(2);


            service.execute(() -> {

                try {

                    //男生对女生说的话

                    String girl = exchanger.exchange("我其实暗恋你很久了......");

                    System.out.println("女孩儿说:" + girl);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            });

            service.execute(() -> {

                try {

                    System.out.println("女生慢慢的从教室你走出来......");

                    TimeUnit.SECONDS.sleep(3);

                    //男生对女生说的话

                    String boy = exchanger.exchange("我也很喜欢你......");

                    System.out.println("男孩儿说:" + boy);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            });


        }

    }


    输出结果:


    女生慢慢的从教室你走出来......

    男孩儿说:我其实暗恋你很久了......

    女孩儿说:我也很喜欢你......

总结

暂时总结到这里,如果还有什么遗漏的,可以留言给我哈。

参考文章

https://www.relaxheart.cn/to/blog/streamline?uuid=83 https://juejin.im/post/5aeec3ebf265da0ba76fa327

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Atomic包
    • 更新基本类型变量
      • 更新数组类型变量
        • 更新引用类型变量
          • 更新对象的某个字段
          • CountDownLatch
          • CyclicBarrier
          • CountDownLatch与CyclicBarrier比较
          • Semaphore
            • Semaphore的主要方法:
              • Semaphore信号量demo
              • 线程间交换数据Exchanger
              • 总结
              • 参考文章
              相关产品与服务
              数据库
              云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档