前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java&Go高性能队列之LinkedBlockingQueue性能测试

Java&Go高性能队列之LinkedBlockingQueue性能测试

作者头像
FunTester
发布2022-02-08 13:19:13
1.3K3
发布2022-02-08 13:19:13
举报
文章被收录于专栏:FunTester

在写完高性能队列Disruptor在测试中应用千万级日志回放引擎设计稿视频版之后,我就一直在准备Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。

测试场景设计的思路参考的两个方面:

  • 消息体大小,我用的不同大小GET请求区分
  • 生产者和消费者线程数,Go语言中称协程goroutine

PS:后续的文章中,Go语言文章中如果出现线程,均指goroutine。

结论

总体来说,java.util.concurrent.LinkedBlockingQueue性能还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来三点比较通用的参考:

  • 消息体尽可能小
  • 线程数增益有限
  • 尽量避免消息积压

简介

首先介绍一下第一个被测试的对象java.util.concurrent.LinkedBlockingQueue,分解名字可以得到这是个由链表实现的阻塞单向的对象。官方给的定义是:

基于链接节点的可选有界阻塞队列。此队列对元素进行 FIFO(先进先出)排序。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素被插入到队列的尾部,队列检索操作获取队列头部的元素。链接队列通常比基于数组的队列具有更高的吞吐量,但在大多数并发应用程序中性能更不可预测。

在我查到的几种JDK自带的队列实现类中,java.util.concurrent.LinkedBlockingQueue性能是最高的,还有一个候选的类java.util.concurrent.ArrayBlockingQueue,资料说java.util.concurrent.LinkedBlockingQueue性能大概是java.util.concurrent.ArrayBlockingQueue性能的2 ~ 3倍,差距过于明显,这个有机会再来测试。

测试结果

这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。

数据说明

这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。

小对象:

代码语言:javascript
复制
def get = new HttpGet()

中对象:

代码语言:javascript
复制
def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

大对象:

代码语言:javascript
复制
def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

生产者

对象大小

队列长度 (百万)

线程数

速率(/ms)

1

1

838

1

5

837

1

10

823

5

1

483

10

1

450

1

1

301

1

5

322

1

10

320

1

20

271

5

1

失败

10

1

失败

0.5

1

351

0.5

5

375

1

1

214

1

5

240

1

10

241

0.5

1

209

0.5

5

250

0.5

10

246

0.2

1

217

0.2

5

309

0.2

10

321

0.2

20

243

中间两次测试失败,是因为等待时间太长了,进行到300万左右开始停滞,所以放弃了。

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 长度保持在十万量级
  2. 生产者线程数5-10线程
  3. 消息体尽可能小

消费者

对象大小

队列长度 (百万)

线程数

速率(/ms)

1

1

1893

1

5

1706

1

10

1594

1

20

1672

2

1

2544

2

5

2024

5

1

3419

1

1

1897

1

5

1485

1

10

1345

1

20

1430

2

1

2971

2

5

1576

1

1

1980

1

5

1623

1

10

1689

0.5

1

1136

0.5

5

1096

0.5

10

1072

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 数据上看长度越长越好
  2. 消费者线程越少越好
  3. 消息体尽可能小

这里跟生产者标准有点不一样,基本上就是锁的竞争越少越好,测试消息数越多越好(这个工作中暂时用不到)。

生产者 & 消费者

这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。

对象大小

次数 (百万)

线程数

队列长度 (百万)

速率(/ms)

1

1

0.1

1326

1

1

0.2

1050

1

1

0.5

1054

1

5

0.1

1091

1

10

0.1

1128

2

1

0.1

1798

2

1

0.2

1122

2

5

0.2

946

5

5

0.1

1079

5

10

0.1

1179

1

1

0.1

632

1

1

0.2

664

1

5

0.2

718

1

10

0.2

683

2

1

0.2

675

2

5

0.2

735

2

10

0.2

788

2

15

0.2

828

1

1

0.1

505

1

1

0.2

558

1

5

0.2

609

1

10

0.2

496

2

1

0.2

523

2

5

0.2

759

2

10

0.2

668

针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

  1. 消息队列积累消息越少,速率越快
  2. 消费速率随时间推移越来越快,不明显
  3. 消息体尽可能小

测试用例

测试用例使用Groovy语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。所以这个用例对于Java同学来讲可能有点看着熟悉,仔细阅读起来有点费劲,我会尽量写一些注释。大家可以把终点放在测试结果上,这可以对以后大家使用java.util.concurrent.LinkedBlockingQueue类有个基本的参考。

测试用例会根据上述的测试场景进行微调,例如线程数、消息体对象的大小等等,这个我会着重进行三种用例场景的测试。当然在工作中使用场景肯定比我提到的三种复杂多,各位有兴趣可以自己亲自上手测试,这里我就不班门弄斧了。

生产者场景

代码语言:javascript
复制
package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicInteger

class QueueT extends SourceCode {

    static AtomicInteger index = new AtomicInteger(0)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 1

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def start = Time.getTimeStamp()
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def barrier = new CyclicBarrier(threadNum + 1)
        def funtester = {//创建异步闭包的方法
            fun {
                barrier.await()
                while (true) {
                    if (index.getAndIncrement() % piece == 0) {
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("${formatLong(index.get())}添加总消耗${formatLong(l)}")
                        start = Time.getTimeStamp()
                    }
                    if (index.get() > total) break

                    def get = new HttpGet(url)
                    get.addHeader("token",token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.put(get)
                }
                latch.countDown()
            }
        }
        threadNum.times {funtester()}
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率${total / (et - st)}")
        outRGB(CountUtil.index(ts).toString())
    }


}

消费者场景

代码语言:javascript
复制
package com.funtest.groovytest

import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueTconsume extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 100_0000

    static int size = 10

    static int threadNum = 5

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {

        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
        def pwait = new CountDownLatch(10)
        def produces = {
            fun {
                while (true) {
                    if (linkedQ.size() > total) break
                    def get = new HttpGet(url)
                    get.addHeader("token", token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    linkedQ.add(get)
                }
                pwait.countDown()
            }
        }
        10.times {produces()}
        pwait.await()
        outRGB("数据构造完成!${linkedQ.size()}")


        def start = Time.getTimeStamp()
        def barrier = new CyclicBarrier(threadNum + 1 )
        def latch = new CountDownLatch(threadNum)
        def ts = []
        def funtester = {
            fun {
                barrier.await()
                while (true) {
                    if (index.getAndIncrement() % piece == 0) {
                        def l = Time.getTimeStamp() - start
                        ts << l
                        output("${formatLong(index.get())}消费总消耗${formatLong(l)}")
                        start = Time.getTimeStamp()
                    }
                    def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS)
                    if (poll == null) break
                }
                latch.countDown()
            }
        }
        threadNum.times {funtester()}
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率${total / (et - st)}")
        outRGB(CountUtil.index(ts).toString())
    }


}


生产者 & 消费者 场景

这里我引入了另外一个变量:初始队列长度length,用例运行之前将队列按照这个长度进行单线程填充。

代码语言:javascript
复制
package com.funtest.groovytest

import com.funtester.frame.SourceCode
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

class QueueBoth extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 500_0000

    static int length = 50_0000

    static int threadNum = 5

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {
        LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()

        def latch = new CountDownLatch(threadNum * 2)
        def barrier = new CyclicBarrier(threadNum * 2 + 1)
        def ts = []
        def funtester = {f ->
            {
                fun {
                    barrier.await()
                    while (true) {
                        if (index.getAndIncrement() > total) break
                        f()
                    }
                    latch.countDown()
                }
            }
        }
        def produces =  {
            def get = new HttpGet(url)
            get.addHeader("token", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)
            linkedQ.put(get)
        }
        length.times {produces()}

        threadNum.times {
            funtester produces
            funtester {linkedQ.poll(100, TimeUnit.MILLISECONDS)}
        }
        def st = Time.getTimeStamp()
        barrier.await()
        latch.await()
        def et = Time.getTimeStamp()
        outRGB("每毫秒速率${total / (et - st) / 2}")
    }


}

补充

性能非常不稳定

其中有两个问题需要补充说明,java.util.concurrent.LinkedBlockingQueue性能在测试过程中非常不稳定,我每次打印日志以1/10为节点打印时间戳,下面分享一些在队列长度100万时,生产者模式中的日志:

代码语言:javascript
复制
INFO-> 23.731 F-2  107,942添加总消耗523
INFO-> 23.897 F-10 200,061添加总消耗165
INFO-> 24.137 F-9  300,024添加总消耗239
INFO-> 24.320 F-2  400,037添加总消耗182
INFO-> 25.200 F-5  500,065添加总消耗879
INFO-> 25.411 F-2  600,094添加总消耗211
INFO-> 25.604 F-8  700,090添加总消耗193
INFO-> 26.868 F-1  800,047添加总消耗1,264
INFO-> 26.927 F-4  900,053添加总消耗57
INFO-> 28.454 F-3  1,000,009添加总消耗1,527
INFO-> 28.457 main 每毫秒速率190.0779319521
INFO-> 28.476 main 平均值:524.0 ,最大值1527.0 ,最小值:57.0 ,中位数:239.0 p99:1527.0 p95:1527.0


INFO-> 43.930 F-10 112,384添加总消耗385
INFO-> 44.072 F-9  200,159添加总消耗140
INFO-> 44.296 F-1  300,058添加总消耗223
INFO-> 44.445 F-7  400,075添加总消耗149
INFO-> 45.311 F-10 500,086添加总消耗866
INFO-> 45.498 F-8  600,080添加总消耗187
INFO-> 45.700 F-1  700,088添加总消耗202
INFO-> 45.760 F-9  800,057添加总消耗59
INFO-> 47.245 F-6  900,095添加总消耗1,485
INFO-> 47.303 F-6  1,000,009添加总消耗58
INFO-> 47.305 main 每毫秒速率262.7430373095
INFO-> 47.320 main 平均值:375.4 ,最大值1485.0 ,最小值:58.0 ,中位数:202.0 p99:1485.0 p95:1485.0


INFO-> 00.916 F-1  100,000添加总消耗568
INFO-> 01.269 F-1  200,000添加总消耗353
INFO-> 01.461 F-1  300,000添加总消耗192
INFO-> 01.635 F-1  400,000添加总消耗174
INFO-> 02.536 F-1  500,000添加总消耗899
INFO-> 02.777 F-1  600,000添加总消耗240
INFO-> 03.015 F-1  700,000添加总消耗237
INFO-> 03.107 F-1  800,000添加总消耗91
INFO-> 04.519 F-1  900,000添加总消耗1,412
INFO-> 05.940 F-1  1,000,000添加总消耗96
INFO-> 05.943 main 每毫秒速率184.5358922310
INFO-> 05.959 main 平均值:426.2 ,最大值1412.0 ,最小值:91.0 ,中位数:240.0 p99:1412.0 p95:1412.0

可以看出最大值最小值能相差十几倍,甚至二十几倍,这种情况随着消息队列总长度增长而增长,大多数发生在80万 ~ 100万阶段,如果将长度降低到50万,这种情况就会得到明显改善。所以还有一个附加观点:消息队列长度应当尽可能少一些。

基准测试

下面是我使用FunTester性能测试框架对三种消息对象的生产代码进行的测试结果。

测试对象

线程数

个数(百万)

速率(/ms)

1

1

5681

5

1

8010

5

5

15105

1

1

1287

5

1

2329

5

5

4176

1

1

807

5

1

2084

5

5

3185

测试用例如下:

代码语言:javascript
复制
package com.funtest.groovytest

import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.httpclient.FunLibrary
import org.apache.http.client.methods.HttpGet

class TTT extends FunLibrary {

    static int total = 100_0000

    static int thread = 1

    static int times = total / thread

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {
        RUNUP_TIME = 0
        def tasks = []
        thread.times {tasks << new FunTester(times)}
        new Concurrent(tasks,"测试生产者代码性能").start()

    }

    private static class FunTester extends FixedThread {

        FunTester(int limit) {
            super(null, limit, true)
        }

        @Override
        protected void doing() throws Exception {
//            def get = new HttpGet()

//            def get = new HttpGet(url)
//            get.addHeader("token", token)
//            get.addHeader(HttpClientConstant.USER_AGENT)
//            get.addHeader(HttpClientConstant.CONNECTION)

            def get = new HttpGet(url + token)
            get.addHeader("token", token)
            get.addHeader("token1", token)
            get.addHeader("token5", token)
            get.addHeader("token4", token)
            get.addHeader("token3", token)
            get.addHeader("token2", token)
            get.addHeader(HttpClientConstant.USER_AGENT)
            get.addHeader(HttpClientConstant.CONNECTION)

        }

        @Override
        FixedThread clone() {
            return new FunTester(limit)
        }
    }

}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 结论
  • 简介
  • 测试结果
    • 数据说明
      • 生产者
        • 消费者
          • 生产者 & 消费者
          • 测试用例
            • 生产者场景
              • 消费者场景
                • 生产者 & 消费者 场景
                • 补充
                  • 性能非常不稳定
                    • 基准测试
                    相关产品与服务
                    消息队列 CMQ 版
                    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档