作用:
调度器
每个线程组都会独立的运行测试计划,互不干扰,多个线程组用于模仿对服务器的并发访问。
ThreadGroup类继承AbstractThreadGroup类
SetupThreadGroup和PostThreadGroup类继承ThreadGroup
/** Ramp-up time */
public static final String RAMP_TIME = "ThreadGroup.ramp_time";
/** Whether thread startup is delayed until required */
public static final String DELAYED_START = "ThreadGroup.delayedStart";
/** Whether scheduler is being used */
public static final String SCHEDULER = "ThreadGroup.scheduler";
/** Scheduler duration, overrides end time */
public static final String DURATION = "ThreadGroup.duration";
/** Scheduler start delay, overrides start time */
public static final String DELAY = "ThreadGroup.delay";
// 核心变量
private transient Thread threadStarter;
// List of active threads
private final ConcurrentHashMap<JMeterThread, Thread> allThreads = new ConcurrentHashMap<>();
private transient Object addThreadLock = new Object();
/** Is test (still) running? */
private volatile boolean running = false;
/** Thread Group number */
private int groupNumber;
/** Are we using delayed startup? */
private boolean delayedStartup;
/** Thread safe class */
private ListenerNotifier notifier;
/** This property will be cloned */
private ListedHashTree threadGroupTree;
启动ThreadGroup线程组
public void start(int groupNum, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
// 设置线程运行标志
this.running = true;
// 线程组编号
this.groupNumber = groupNum;
// 通知类
this.notifier = notifier;
this.threadGroupTree = threadGroupTree;
// 子线程数
int numThreads = getNumThreads();
// 预期线程组的所有线程从启动-运行-释放的总时间
int rampUpPeriodInSeconds = getRampUp();
float perThreadDelayInMillis = (float) (rampUpPeriodInSeconds * 1000) / (float) getNumThreads();
// 延迟创建线程的标志
delayedStartup = isDelayedStartup(); // Fetch once; needs to stay constant
log.info("Starting thread group... number={} threads={} ramp-up={} perThread={} delayedStart={}", groupNumber,
numThreads, rampUpPeriodInSeconds, perThreadDelayInMillis, delayedStartup);
// 延迟创建线程直到需要
if (delayedStartup) {
// 创建延时启动线程ThreadStarter
threadStarter = new Thread(new ThreadStarter(notifier, threadGroupTree, engine), getName()+"-ThreadStarter");
// 设置为守护线程
threadStarter.setDaemon(true);
// 启动线程
threadStarter.start();
// N.B. we don't wait for the thread to complete, as that would prevent parallel TGs
} else {
long now = System.currentTimeMillis(); // needs to be same time for all threads in the group
final JMeterContext context = JMeterContextService.getContext();
// 多线程执行JMeterThread子线程
for (int threadNum = 0; running && threadNum < numThreads; threadNum++) {
startNewThread(notifier, threadGroupTree, engine, threadNum, context, now, (int)(threadNum * perThreadDelayInMillis));
}
}
log.info("Started thread group number {}", groupNumber);
}
创建JMeterThread子线程
private JMeterThread startNewThread(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine,
int threadNum, final JMeterContext context, long now, int delay) {
JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNum, context);
// 调度器
scheduleThread(jmThread, now); // set start and end time
jmThread.setInitialDelay(delay);
Thread newThread = new Thread(jmThread, jmThread.getThreadName());
// 线程启动时存储线程,主要结合serialized来使用
registerStartedThread(jmThread, newThread);
// 启动JMeterThread子线程
newThread.start();
return jmThread;
}
创建JMeterThread线程,并初始化属性
private JMeterThread makeThread(
ListenerNotifier notifier, ListedHashTree threadGroupTree,
StandardJMeterEngine engine, int threadNumber,
JMeterContext context) { // N.B. Context needs to be fetched in the correct thread
boolean onErrorStopTest = getOnErrorStopTest();
boolean onErrorStopTestNow = getOnErrorStopTestNow();
boolean onErrorStopThread = getOnErrorStopThread();
boolean onErrorStartNextLoop = getOnErrorStartNextLoop();
String groupName = getName();
final JMeterThread jmeterThread = new JMeterThread(cloneTree(threadGroupTree), this, notifier);
jmeterThread.setThreadNum(threadNumber);
jmeterThread.setThreadGroup(this);
jmeterThread.setInitialContext(context);
String distributedPrefix =
JMeterUtils.getPropDefault(JMeterUtils.THREAD_GROUP_DISTRIBUTED_PREFIX_PROPERTY_NAME, "");
// 获取线程名字
final String threadName = distributedPrefix + (distributedPrefix.isEmpty() ? "":"-") +groupName + " " + groupNumber + "-" + (threadNumber + 1);
jmeterThread.setThreadName(threadName);
jmeterThread.setEngine(engine);
jmeterThread.setOnErrorStopTest(onErrorStopTest);
jmeterThread.setOnErrorStopTestNow(onErrorStopTestNow);
jmeterThread.setOnErrorStopThread(onErrorStopThread);
jmeterThread.setOnErrorStartNextLoop(onErrorStartNextLoop);
return jmeterThread;
}
调度器的作用:控制每个线程组运行的持续时间以及它在多少秒后再启动
private void scheduleThread(JMeterThread thread, long now) {
if (!getScheduler()) { // if the Scheduler is not enabled
return;
}
if (getDelay() >= 0) { // Duration is in seconds
// 设置线程开始时间
thread.setStartTime(getDelay() * 1000 + now);
} else {
throw new JMeterStopTestException("Invalid delay " + getDelay() + " set in Thread Group:" + getName());
}
// set the endtime for the Thread
if (getDuration() > 0) {// Duration is in seconds
// 设置线程运行的持续时间
thread.setEndTime(getDuration() * 1000 + (thread.getStartTime()));
} else {
throw new JMeterStopTestException("Invalid duration " + getDuration() + " set in Thread Group:" + getName());
}
// Enables the scheduler
thread.setScheduled(true);
}
线程启动时存储线程,结合serialized属性一起使用,用于线程组是串行执行还是并行执行
private void registerStartedThread(JMeterThread jMeterThread, Thread newThread) {
allThreads.put(jMeterThread, newThread);
}
ThreadStarter是延迟启动线程类,主要结合delayedStartup和schedule属性一起使用,该类继承Runnable接口,也是一个线程类对象。
public ThreadStarter(ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine) {
super();
this.notifier = notifier;
this.threadGroupTree = threadGroupTree;
this.engine = engine;
// Store context from Root Thread to pass it to created threads
this.context = JMeterContextService.getContext();
}
执行启动延迟的线程,调用的仍然是JMeterThread类
public void run() {
try {
// Copy in ThreadStarter thread context from calling Thread
JMeterContextService.getContext().setVariables(this.context.getVariables());
long endtime = 0;
// 获取调度器标志
final boolean usingScheduler = getScheduler();
if (usingScheduler) {
// set the start time for the Thread
// 启动延迟时间
if (getDelay() > 0) {// Duration is in seconds
delayBy(getDelay() * 1000);
}
// set the endtime for the Thread
// 持续时间
endtime = getDuration();
if (endtime > 0) {// Duration is in seconds, starting from when the threads start
// 线程执行结束时间
endtime = endtime *1000 + System.currentTimeMillis();
}
}
// 获取线程组的执行线程数
final int numThreads = getNumThreads();
// ramp-up delay = 达到指定线程数所需要的时间(秒) / 线程数,最后取整
final int perThreadDelayInMillis = Math.round((float) (getRampUp() * 1000) / (float) numThreads);
for (int threadNumber = 0; running && threadNumber < numThreads; threadNumber++) {
if (threadNumber > 0) {
pause(perThreadDelayInMillis); // ramp-up delay (except first)
}
if (usingScheduler && System.currentTimeMillis() > endtime) {
break; // no point continuing beyond the end time
}
JMeterThread jmThread = makeThread(notifier, threadGroupTree, engine, threadNumber, context);
// 这里要注意下:父线程已经进行线程等待了,子线程就不需要设置等待时间了
jmThread.setInitialDelay(0); // Already waited
if (usingScheduler) {
jmThread.setScheduled(true);
jmThread.setEndTime(endtime);
}
Thread newThread = new Thread(jmThread, jmThread.getThreadName());
newThread.setDaemon(false); // ThreadStarter is daemon, but we don't want sampler threads to be so too
registerStartedThread(jmThread, newThread);
newThread.start();
}
} catch (Exception ex) {
log.error("An error occurred scheduling delay start of threads for Thread Group: {}", getName(), ex);
}
}
设置启用延迟时间
private void delayBy(long delay) {
if (delay > 0) {
// 获取当前时间
long start = System.currentTimeMillis();
// 启动延迟时间
long end = start + delay;
long now;
long pause = RAMPUP_GRANULARITY; // maximum pause to use
while(running && (now = System.currentTimeMillis()) < end) {
long togo = end - now;
// 比较大小
if (togo < pause) {
pause = togo;
}
// 线程等待
pause(pause); // delay between checks
}
}
}
线程等待
private void pause(long ms){
try {
// 实现上也是调用Thread.sleep方法
TimeUnit.MILLISECONDS.sleep(ms);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。