前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JMeter5.1核心类StandardJMeterEngine源码分析

JMeter5.1核心类StandardJMeterEngine源码分析

原创
作者头像
天堂小说
发布2021-12-03 16:44:44
1.1K0
发布2021-12-03 16:44:44
举报
文章被收录于专栏:JMeter源码分析

概述

JMeter 默认单机压测引擎,运行 JMeter 测试,直接用于本地 GUI 和非 GUI 调用,或者RemoteJMeterEngineImpl 在服务器模式下运行时启动。

StandardJMeterEngine继承JMeterEngine 和Runable接口,本质上是一个线程对象。

API地址:

API地址:https://jmeter.apache.org/api/org/apache/jmeter/engine/StandardJMeterEngine.htm

简要解读:

  • HashTree 是依赖的数据结构;
  • SearchByClass 用来查找 HashTree 中的所有节点,并把节点实例化为真正的对象,例如TestPlan/ThreadGroup/Sampler/ResultCollector 在 HashTree 中本来都是只是配置,全部通过 SearchByClass 实例化的;
  • 实例化出来的对象如果是 TestStateListener 类型,则会在有生命周期的函数回调,测试前调 testStarted,结束时调 testEnded, 比如 ResultCollector是该类型的一种,在结束的时候回调 testEnded 方法完成 report 的写入;
  • PreCompiler 用来解析 Arguments, 把 TestPlan 节点中配置的参数作为JMeterVariables 加入到测试线程上线文中,同时进行参数和函数替换操作;
  • ThreadGroup 用来用来管理一组线程,包括线程的个数/启动/关闭等;
  • StopTest 作为其内部类对外不可见,作为一个 Runnable,作用是异步停止测试,stopTest方法也是通过该内部类实现的。

源码解读

主要参数变量

代码语言:txt
复制
    private static volatile StandardJMeterEngine engine;

    private volatile boolean running = false;

    private volatile boolean active = false;

    private volatile boolean serialized = false;

    private volatile boolean tearDownOnShutdown = false;

    private HashTree test;

    private final String host;

构造函数

代码语言:txt
复制
    // 不带参数的构造函数,用于单机压测
    public StandardJMeterEngine() {
        this(null);
    }

    // 带参数的构造函数,用于分布式压测
    public StandardJMeterEngine(String host) {
        this.host = host;
        // Hack to allow external control
        initSingletonEngine(this);
    }

主要方法

configure(HashTree testTree)

配置引擎,HashTree 是 JMeter 执行测试依赖的数据结构,configure 在执行测试之前进行配置测试数据。

代码语言:txt
复制
    // 从HashTree中解析出TestPlan,并创建
    SearchByClass<TestPlan> testPlan = new SearchByClass<>(TestPlan.class);
    testTree.traverse(testPlan);
    Object[] plan = testPlan.getSearchResults().toArray();
    if (plan.length == 0) {
        throw new IllegalStateException("Could not find the TestPlan class!");
    }
    TestPlan tp = (TestPlan) plan[0];

    // 一个测试中可能会有多个线程组,如果serialized为true,则StandardJMeterEngine会串行的去执行这些线程组,每启动一个ThreadGroup主线程都会等它结束;否则就并行执行所有的线程组。
    serialized = tp.isSerialized();

    // tearDownOnShutdown与PostThreadGroup配合使用的,用来做清理工作
    tearDownOnShutdown = tp.isTearDownOnShutdown();
    active = true;
    test = testTree;

runTest

runTest()方法启动测试,该方法调用的是继承Runnable后重写的run()方法。

StandardJMeterEngine实现了Runable接口,本质上是一个线程,通过运行run()方法启动测试。

代码语言:txt
复制
    public void runTest() throws JMeterEngineException {
        if (host != null){
            long now=System.currentTimeMillis();
            System.out.println("Starting the test on host " + host + " @ "+new Date(now)+" ("+now+")"); // NOSONAR Intentional
        }
        try {
            // StandardJMeterEngine本质上是一个线程
            Thread runningThread = new Thread(this, "StandardJMeterEngine");
            runningThread.start();
        } catch (Exception err) {
            stopTest();
            throw new JMeterEngineException(err);
        }
    }

run

执行StandardJMeterEngine的run方法

JMeterContextService类,初始化操作:numberOfActiveThreads=0, 重置 testStart时间,创建线程变量JMeterContext

代码语言:txt
复制
    JMeterContextService.startTest();

PreCompiler类,继承HashTree类,初始化testPlan的Arguments类变量,替换参数化变量,存入JMeterVariables

代码语言:txt
复制
    PreCompiler compiler = new PreCompiler();
    test.traverse(compiler);

利用 SearchByClass 解析所有 TestStateListener类

TestStateListener类:生命周期的函数回调,测试前调 testStarted,结束时调 testEnded

代码语言:txt
复制
        SearchByClass<TestStateListener> testListeners = new SearchByClass<>(TestStateListener.class); // TL - S&E
        test.traverse(testListeners);

        //将testlist中的元素添加进去
        testListeners.getSearchResults().addAll(testList); 
        testList.clear();

测试前调用testStart方法,比如TestPlan会加载依赖包,BackendListener会启动守护线程收集SamplerResult,ResultCollector类会递增 instanceCount,初始化 fileOutput,DataSourceElement类创建跟数据库的连接等等。

代码语言:txt
复制
    notifyTestListenersOfStart(testListeners);

    private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
        for (TestStateListener tl : testListeners.getSearchResults()) {
            if (tl instanceof TestBean) {
                TestBeanHelper.prepare((TestElement) tl);
            }
            if (host == null) {
                tl.testStarted();
            } else {
                tl.testStarted(host);
            }
        }
    }

利用 SearchByClass 解析所有 ThreadGroup(包括SetupThreadGroup,ThreadGroup, PostThreadGroup)

代码语言:txt
复制
    SearchByClass<SetupThreadGroup> setupSearcher = new SearchByClass<>(SetupThreadGroup.class);
    SearchByClass<AbstractThreadGroup> searcher = new SearchByClass<>(AbstractThreadGroup.class);
    SearchByClass<PostThreadGroup> postSearcher = new SearchByClass<>(PostThreadGroup.class);

    test.traverse(setupSearcher);
    test.traverse(searcher);
    test.traverse(postSearcher);

    Iterator<SetupThreadGroup> setupIter = setupSearcher.getSearchResults().iterator();
    Iterator<AbstractThreadGroup> iter = searcher.getSearchResults().iterator();
    Iterator<PostThreadGroup> postIter = postSearcher.getSearchResults().iterator();

实例化一个 ListenerNotifier 实例,用来通知事件发生,比如backendListener, resultCollector等

代码语言:txt
复制
    ListenerNotifier notifier = new ListenerNotifier();

启动所有 SetupThreadGroup (一般情况下没有 SetupThreadGroup )并等待结束

代码语言:txt
复制
    if (setupIter.hasNext()) {
        log.info("Starting setUp thread groups");
        while (running && setupIter.hasNext()) {//for each setup thread group
            AbstractThreadGroup group = setupIter.next();
            groupCount++;
            String groupName = group.getName();
            log.info("Starting setUp ThreadGroup: {} : {} ", groupCount, groupName);
            startThreadGroup(group, groupCount, setupSearcher, testLevelElements, notifier);
            // 判断ThreadGroup是否串行执行
            if (serialized && setupIter.hasNext()) {
                log.info("Waiting for setup thread group: {} to finish before starting next setup group", 
                        groupName);
                group.waitThreadsStopped();
            }
        }    
        log.info("Waiting for all setup thread groups to exit");
        //wait for all Setup Threads To Exit
        waitThreadsStopped();
        log.info("All Setup Threads have ended");
        groupCount=0;
        JMeterContextService.clearTotalThreads();
    }

    groups.clear();

等待所有的ThreadGroup结束

代码语言:txt
复制
    
    JMeterContextService.getContext().setSamplingStarted(true);
    boolean mainGroups = running; // still running at this point, i.e. setUp was not cancelled

    while (running && iter.hasNext()) {// for each thread group
        AbstractThreadGroup group = iter.next();
        //ignore Setup and Post here.  We could have filtered the searcher. but then
        //future Thread Group objects wouldn't execute.
        if (group instanceof SetupThreadGroup ||
                group instanceof PostThreadGroup) {
            continue;
        }
        groupCount++;
        String groupName = group.getName();
        log.info("Starting ThreadGroup: {} : {}", groupCount, groupName);
        startThreadGroup(group, groupCount, searcher, testLevelElements, notifier);
        // 判断ThreadGroup是否串行执行
        if (serialized && iter.hasNext()) {
            log.info("Waiting for thread group: {} to finish before starting next group", groupName);
            // 调用ThreadGroup的waitThreadsStopped方法
            group.waitThreadsStopped();
        }
    } // end of thread groups
    if (groupCount == 0){ // No TGs found
        log.info("No enabled thread groups found");
    } else {
        if (running) {
            log.info("All thread groups have been started");
        } else {
            log.info("Test stopped - no more thread groups will be started");
        }
    }

    //wait for all Test Threads To Exit
    waitThreadsStopped();
    groups.clear();

执行所有PostThreadGroup(一般没有), 并等待结束

代码语言:txt
复制
    if (postIter.hasNext()){
        groupCount = 0;
        JMeterContextService.clearTotalThreads();
        log.info("Starting tearDown thread groups");
        if (mainGroups && !running) { // i.e. shutdown/stopped during main thread groups
            running = tearDownOnShutdown; // re-enable for tearDown if necessary
        }
        while (running && postIter.hasNext()) {//for each setup thread group
            AbstractThreadGroup group = postIter.next();
            groupCount++;
            String groupName = group.getName();
            log.info("Starting tearDown ThreadGroup: {} : {}", groupCount, groupName);
            startThreadGroup(group, groupCount, postSearcher, testLevelElements, notifier);
            if (serialized && postIter.hasNext()) {
                log.info("Waiting for post thread group: {} to finish before starting next post group", groupName);
                group.waitThreadsStopped();
            }
        }
        waitThreadsStopped(); // wait for Post threads to stop
    }

测试结束调用testListener 的 testEnded 方法,比如:

JavaSampler 会调用真正跑的 AbstractJavaSamplerClient 的 teardownTest 方法,可以打印该 JavaSamplerClient 测试总共花费的时间;

ResultCollector 用来将测试结果写如文件生成;

reportTestPlan 用来关闭文件。

BackendListener 用来清理BackendListenerClients

代码语言:txt
复制
    notifyTestListenersOfEnd(testListeners);
    JMeterContextService.endTest();

    private void notifyTestListenersOfEnd(SearchByClass<TestStateListener> testListeners) {
        log.info("Notifying test listeners of end of test");
        for (TestStateListener tl : testListeners.getSearchResults()) {
            try {
                if (host == null) {
                    tl.testEnded();
                } else {
                    tl.testEnded(host);
                }
            } catch (Exception e) {
                log.warn("Error encountered during shutdown of "+tl.toString(),e);
            }
        }
        if (host != null) {
            log.info("Test has ended on host {} ", host);
            long now=System.currentTimeMillis();
            System.out.println("Finished the test on host " + host + " @ "+new Date(now)+" ("+now+")" // NOSONAR Intentional
                    +(EXIT_AFTER_TEST ? " - exit requested." : ""));
            if (EXIT_AFTER_TEST){
                // 远程server退出
                exit();
            }
        }
        active=false;
    }

startThreadGroup

启动ThreadGroup线程组

代码语言:txt
复制
    private void startThreadGroup(AbstractThreadGroup group, int groupCount, SearchByClass<?> searcher, List<?> testLevelElements, ListenerNotifier notifier)
    {
        try {
            // 获取线程组的属性配置
            int numThreads = group.getNumThreads();
            JMeterContextService.addTotalThreads(numThreads);
            boolean onErrorStopTest = group.getOnErrorStopTest();
            boolean onErrorStopTestNow = group.getOnErrorStopTestNow();
            boolean onErrorStopThread = group.getOnErrorStopThread();
            boolean onErrorStartNextLoop = group.getOnErrorStartNextLoop();
            String groupName = group.getName();
            log.info("Starting {} threads for group {}.", numThreads, groupName);
            if (onErrorStopTest) {
                log.info("Test will stop on error");
            } else if (onErrorStopTestNow) {
                log.info("Test will stop abruptly on error");
            } else if (onErrorStopThread) {
                log.info("Thread will stop on error");
            } else if (onErrorStartNextLoop) {
                log.info("Thread will start next loop on error");
            } else {
                log.info("Thread will continue on error");
            }
            // 获取ThreadGroup的HashTree对象
            ListedHashTree threadGroupTree = (ListedHashTree) searcher.getSubTree(group);
            threadGroupTree.add(group, testLevelElements);
    
            // 将threadGroup加入List列表
            groups.add(group);
            // 执行ThreadGroup对象的start方法
            group.start(groupCount, notifier, threadGroupTree, this);
        } catch (JMeterStopTestException ex) { // NOSONAR Reported by log
            JMeterUtils.reportErrorToUser("Error occurred starting thread group :" + group.getName()+ ", error message:"+ex.getMessage()
                +", \r\nsee log file for more details", ex);
            return; // no point continuing
        }
    }   

waitThreadsStopped

等待线程结束

代码语言:txt
复制
    private void waitThreadsStopped() {
        // ConcurrentHashMap does not need synch. here
        for (AbstractThreadGroup threadGroup : groups) {
            // 调用ThreadGroup的waitThreadsStopped方法
            threadGroup.waitThreadsStopped();
        }
    }

stopTest

停止测试,若 now 为 true 则停止动作立即执行;若为 false 则等待当前正在执行的测试至少执行完一个 iteration。

代码语言:txt
复制
    public synchronized void stopTest() {
        stopTest(true);
    }

    public synchronized void stopTest(boolean now) {
        Thread stopThread = new Thread(new StopTest(now));
        stopThread.start();
    }

调用内部类StopTest,由于该类继承Runnable,调用该类的run方法

代码语言:txt
复制
    public void run() {
        running = false;
        resetSingletonEngine();
        // 立即停止线程的标志
        if (now) {
            // 停止线程组
            tellThreadGroupsToStop();
            pause(10L * countStillActiveThreads());
            // 验证线程组是否停止
            boolean stopped = verifyThreadsStopped();
            // 停止失败
            if (!stopped) {  // we totally failed to stop the test
                if (JMeter.isNonGUI()) {
                    log.error(JMeterUtils.getResString("stopping_test_failed")); //$NON-NLS-1$
                    if (SYSTEM_EXIT_ON_STOP_FAIL) { // default is true
                        log.error("Exiting");
                        System.out.println("Fatal error, could not stop test, exiting"); // NOSONAR Intentional
                        System.exit(1); // NOSONAR Intentional
                    } else {
                        System.out.println("Fatal error, could not stop test"); // NOSONAR Intentional                            
                    }
                } else {  // 非立即停止
                    JMeterUtils.reportErrorToUser(
                            JMeterUtils.getResString("stopping_test_failed"), //$NON-NLS-1$
                            JMeterUtils.getResString("stopping_test_title")); //$NON-NLS-1$
                }
            } // else will be done by threadFinished()
        } else {
            stopAllThreadGroups();
        }
    }

停止ThreadGroup线程组

代码语言:txt
复制
        private void tellThreadGroupsToStop() {
            // ConcurrentHashMap does not need protecting
            for (AbstractThreadGroup threadGroup : groups) {
                threadGroup.tellThreadsToStop();
            }
        }

验证线程组是否全部停止

代码语言:txt
复制
    private boolean verifyThreadsStopped() {
            boolean stoppedAll = true;
            // ConcurrentHashMap does not need synch. here
            for (AbstractThreadGroup threadGroup : groups) {
                stoppedAll = stoppedAll && threadGroup.verifyThreadsStopped();
            }
            return stoppedAll;
    }

等待线程组执行完

代码语言:txt
复制
    private void stopAllThreadGroups() {
            // ConcurrentHashMap does not need synch. here
            for (AbstractThreadGroup threadGroup : groups) {
                threadGroup.stop();
            }
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 简要解读:
  • 源码解读
    • 主要参数变量
      • 构造函数
        • 主要方法
          • configure(HashTree testTree)
            • runTest
              • run
                • startThreadGroup
                  • waitThreadsStopped
                    • stopTest
                    相关产品与服务
                    云服务器
                    云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档