前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JMeter5.1核心类ThreadGroup源码分析

JMeter5.1核心类ThreadGroup源码分析

原创
作者头像
天堂小说
修改2021-12-03 18:01:13
7971
修改2021-12-03 18:01:13
举报
文章被收录于专栏:JMeter源码分析

概述

  • 线程组是一个测试计划的开始点
  • 在一个测试计划中的所有元件都必须在某个线程组下
  • 线程组决定 Jmeter 执行测试计划的线程数

作用:

  • 设置线程数
  • 设置ramp-up period:达到指定线程数所需要的时间
  • 设置执行测试的次数
  • 延迟创建线程:直到线程被需要的采样器开始执行时才会被创建,避免资源浪费

调度器

  • Duration (seconds) :持续时间,单位为秒
  • Startup Delay (seconds):启动延迟,单位为秒

每个线程组都会独立的运行测试计划,互不干扰,多个线程组用于模仿对服务器的并发访问。

源码解读

ThreadGroup类继承AbstractThreadGroup类

SetupThreadGroup和PostThreadGroup类继承ThreadGroup

主要变量

代码语言:txt
复制
    /** Ramp-up time */
    public static final String RAMP_TIME = "ThreadGroup.ramp_time";

    /** Whether thread startup is delayed until required */
    public static final String DELAYED_START = "ThreadGroup.delayedStart";

    /** Whether scheduler is being used */
    public static final String SCHEDULER = "ThreadGroup.scheduler";

    /** Scheduler duration, overrides end time */
    public static final String DURATION = "ThreadGroup.duration";

    /** Scheduler start delay, overrides start time */
    public static final String DELAY = "ThreadGroup.delay";

    // 核心变量
    private transient Thread threadStarter;

    // List of active threads
    private final ConcurrentHashMap<JMeterThread, Thread> allThreads = new ConcurrentHashMap<>();
    
    private transient Object addThreadLock = new Object();

    /** Is test (still) running? */
    private volatile boolean running = false;

    /** Thread Group number */
    private int groupNumber;

    /** Are we using delayed startup? */
    private boolean delayedStartup;

    /** Thread safe class */
    private ListenerNotifier notifier;

    /** This property will be cloned */
    private ListedHashTree threadGroupTree; 

start

启动ThreadGroup线程组

代码语言:txt
复制
    public void start(int groupNum, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
        // 设置线程运行标志
        this.running = true;
        // 线程组编号
        this.groupNumber = groupNum;
        // 通知类
        this.notifier = notifier;
        this.threadGroupTree = threadGroupTree;
        // 子线程数
        int numThreads = getNumThreads();
        // 预期线程组的所有线程从启动-运行-释放的总时间
        int rampUpPeriodInSeconds = getRampUp();
        float perThreadDelayInMillis = (float) (rampUpPeriodInSeconds * 1000) / (float) getNumThreads();

        //  延迟创建线程的标志
        delayedStartup = isDelayedStartup(); // Fetch once; needs to stay constant
        log.info("Starting thread group... number={} threads={} ramp-up={} perThread={} delayedStart={}", groupNumber,
                numThreads, rampUpPeriodInSeconds, perThreadDelayInMillis, delayedStartup);
        // 延迟创建线程直到需要
        if (delayedStartup) {
            // 创建延时启动线程ThreadStarter
            threadStarter = new Thread(new ThreadStarter(notifier, threadGroupTree, engine), getName()+"-ThreadStarter");
            // 设置为守护线程
            threadStarter.setDaemon(true);
            // 启动线程
            threadStarter.start();
            // N.B. we don't wait for the thread to complete, as that would prevent parallel TGs
        } else {
            long now = System.currentTimeMillis(); // needs to be same time for all threads in the group
            final JMeterContext context = JMeterContextService.getContext();
            // 多线程执行JMeterThread子线程
            for (int threadNum = 0; running && threadNum < numThreads; threadNum++) {
                startNewThread(notifier, threadGroupTree, engine, threadNum, context, now, (int)(threadNum * perThreadDelayInMillis));
            }
        }
        log.info("Started thread group number {}", groupNumber);
    } 

startNewThread

创建JMeterThread子线程

代码语言:txt
复制
    private JMeterThread startNewThread(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine,
            int threadNum, final JMeterContext context, long now, int delay) {
        JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNum, context);
        // 调度器
        scheduleThread(jmThread, now); // set start and end time
        jmThread.setInitialDelay(delay);
        Thread newThread = new Thread(jmThread, jmThread.getThreadName());
        // 线程启动时存储线程,主要结合serialized来使用
        registerStartedThread(jmThread, newThread);
        // 启动JMeterThread子线程
        newThread.start();
        return jmThread;
    }

makeThread

创建JMeterThread线程,并初始化属性

代码语言:txt
复制
    private JMeterThread makeThread(
            ListenerNotifier notifier, ListedHashTree threadGroupTree,
            StandardJMeterEngine engine, int threadNumber, 
            JMeterContext context) { // N.B. Context needs to be fetched in the correct thread
        boolean onErrorStopTest = getOnErrorStopTest();
        boolean onErrorStopTestNow = getOnErrorStopTestNow();
        boolean onErrorStopThread = getOnErrorStopThread();
        boolean onErrorStartNextLoop = getOnErrorStartNextLoop();
        String groupName = getName();
        final JMeterThread jmeterThread = new JMeterThread(cloneTree(threadGroupTree), this, notifier);
        jmeterThread.setThreadNum(threadNumber);
        jmeterThread.setThreadGroup(this);
        jmeterThread.setInitialContext(context);
        String distributedPrefix = 
                JMeterUtils.getPropDefault(JMeterUtils.THREAD_GROUP_DISTRIBUTED_PREFIX_PROPERTY_NAME, "");
        // 获取线程名字
        final String threadName = distributedPrefix + (distributedPrefix.isEmpty() ? "":"-") +groupName + " " + groupNumber + "-" + (threadNumber + 1);
        jmeterThread.setThreadName(threadName);
        jmeterThread.setEngine(engine);
        jmeterThread.setOnErrorStopTest(onErrorStopTest);
        jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
        jmeterThread.setOnErrorStopThread(onErrorStopThread);
        jmeterThread.setOnErrorStartNextLoop(onErrorStartNextLoop);
        return jmeterThread;
    }

scheduleThread

调度器的作用:控制每个线程组运行的持续时间以及它在多少秒后再启动

  • Duration (seconds) :持续时间;线程组运行的持续时间
  • Startup Delay (seconds):启动延迟;测试计划开始后,线程组的线程将在多少秒后再启动运行
代码语言:txt
复制
    private void scheduleThread(JMeterThread thread, long now) {

        if (!getScheduler()) { // if the Scheduler is not enabled
            return;
        }

        if (getDelay() >= 0) { // Duration is in seconds
            // 设置线程开始时间
            thread.setStartTime(getDelay() * 1000 + now);
        } else {
            throw new JMeterStopTestException("Invalid delay " + getDelay() + " set in Thread Group:" + getName());
        }

        // set the endtime for the Thread
        if (getDuration() > 0) {// Duration is in seconds
            // 设置线程运行的持续时间
            thread.setEndTime(getDuration() * 1000 + (thread.getStartTime()));
        } else {
            throw new JMeterStopTestException("Invalid duration " + getDuration() + " set in Thread Group:" + getName());
        }
        // Enables the scheduler
        thread.setScheduled(true);
    }

registerStartedThread

线程启动时存储线程,结合serialized属性一起使用,用于线程组是串行执行还是并行执行

代码语言:txt
复制
    private void registerStartedThread(JMeterThread jMeterThread, Thread newThread) {
        allThreads.put(jMeterThread, newThread);
    }

ThreadStarter

ThreadStarter是延迟启动线程类,主要结合delayedStartup和schedule属性一起使用,该类继承Runnable接口,也是一个线程类对象。

构造函数

代码语言:txt
复制
    public ThreadStarter(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
        super();
        this.notifier = notifier;
        this.threadGroupTree = threadGroupTree;
        this.engine = engine;
        // Store context from Root Thread to pass it to created threads
        this.context = JMeterContextService.getContext();
    }

run

执行启动延迟的线程,调用的仍然是JMeterThread类

代码语言:txt
复制
    public void run() {
    try {
        // Copy in ThreadStarter thread context from calling Thread
        JMeterContextService.getContext().setVariables(this.context.getVariables());
        long endtime = 0;
        // 获取调度器标志
        final boolean usingScheduler = getScheduler();
        if (usingScheduler) {
            // set the start time for the Thread
            // 启动延迟时间
            if (getDelay() > 0) {// Duration is in seconds
                delayBy(getDelay() * 1000); 
            }
            // set the endtime for the Thread
            // 持续时间
            endtime = getDuration();  
            if (endtime > 0) {// Duration is in seconds, starting from when the threads start
                // 线程执行结束时间
                endtime = endtime *1000 + System.currentTimeMillis();
            }
        }
        // 获取线程组的执行线程数
        final int numThreads = getNumThreads();
        // ramp-up delay = 达到指定线程数所需要的时间(秒) / 线程数,最后取整
        final int perThreadDelayInMillis = Math.round((float) (getRampUp() * 1000) / (float) numThreads);
        for (int threadNumber = 0; running && threadNumber < numThreads; threadNumber++) {
            if (threadNumber > 0) {
                pause(perThreadDelayInMillis); // ramp-up delay (except first)
            }
            if (usingScheduler && System.currentTimeMillis() > endtime) {
                break; // no point continuing beyond the end time
            }
            JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNumber, context);
            // 这里要注意下:父线程已经进行线程等待了,子线程就不需要设置等待时间了
            jmThread.setInitialDelay(0);   // Already waited
            if (usingScheduler) {
                jmThread.setScheduled(true);
                jmThread.setEndTime(endtime);
            }
            Thread newThread = new Thread(jmThread, jmThread.getThreadName());
            newThread.setDaemon(false); // ThreadStarter is daemon, but we don't want sampler threads to be so too
            registerStartedThread(jmThread, newThread);
            newThread.start();
        }
    } catch (Exception ex) {
        log.error("An error occurred scheduling delay start of threads for Thread Group: {}", getName(), ex);
    }
}

delayBy

设置启用延迟时间

代码语言:txt
复制
        private void delayBy(long delay) {
            if (delay > 0) {
                //  获取当前时间
                long start = System.currentTimeMillis();
                // 启动延迟时间
                long end = start + delay;
                long now;
                long pause = RAMPUP_GRANULARITY; // maximum pause to use
                while(running && (now = System.currentTimeMillis()) < end) {
                    long togo = end - now;
                    // 比较大小
                    if (togo < pause) {
                        pause = togo;
                    }
                    // 线程等待
                    pause(pause); // delay between checks
                }
            }
        }

pause

线程等待

代码语言:txt
复制
    private void pause(long ms){
            try {
                 // 实现上也是调用Thread.sleep方法
                TimeUnit.MILLISECONDS.sleep(ms); 
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
    • 源码解读
      • 主要变量
        • start
          • startNewThread
            • makeThread
              • scheduleThread
                • registerStartedThread
                  • ThreadStarter
                    • 构造函数
                      • run
                        • delayBy
                          • pause
                          相关产品与服务
                          云服务器
                          云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档