性能工具之 JMeter5.0 核心类 StandardJMeterEngine 源码分析
概述
JMeter 默认单机压测引擎,运行 JMeter 测试,直接用于本地 GUI 和非 GUI 调用,或者RemoteJMeterEngineImpl 在服务器模式下运行时启动。
API地址:https://jmeter.apache.org/api/org/apache/jmeter/engine/StandardJMeterEngine.html
逻辑关系
简要解读:
- HashTree是依赖的数据结构;
- SearchByClass 用来查找 HashTree 中的所有节点,并把节点实例化为真正的对象,例如图中TestPlan/ThreadGroup/JavaSampler/ResultCollector 在 HashTree 中本来都是只是配置,全部通过 SearchByClass 实例化的;
- 实例化出来的对象如果是 TestStateListener 类型,则会在有生命周期的函数回调,测试前调 testStarted,结束掉 testEnded, 比如 ResultCollector是该类型的一种,在结束的时候回调 testEnded 方法完成 report 的写入;
- PreCompiler 用来解析 Arguments, 把 TestPlan 节点中配置的参数作为JMeterVariables 加入到测试线程上线文中;
- ThreadGroup 用来用来管理一组线程,包括线程的个数/启动/关闭等;
- StopTest 作为其内部类对外不可见,作为一个 Runnable,作用是异步停止测试,stopTest方法也是通过该内部类实现的。
主要变量
注意关键字 volatile
// 灵魂级变量,注意关键字volatile
private static volatile StandardJMeterEngine engine;
构造函数
有两种构造函数,带参和不带参
// 不带参构造函数
public StandardJMeterEngine() {
this(null);
}
// 带参构造函数
public StandardJMeterEngine(String host) {
this.host = host;
// Hack to allow external control
initSingletonEngine(this);
}
主要方法
askThreadsToStop
清洁关闭,即等待当前运行的采样器结束
/**
* Clean shutdown ie, wait for end of current running samplers
*/
public void askThreadsToStop() {
if (engine != null) { // Will be null if StopTest thread has started
engine.stopTest(false);
}
}
reset
JMeterEngine 如果运行则停止
// 重置。在StandardJMeterEngine中就是直接调用stopTest(true).
@Override
public void reset() {
if (running) {
stopTest();
}
}
configure(HashTree testTree)
配置引擎,HashTree 是 JMeter 执行测试依赖的数据结构,configure 在执行测试之前进行配置测试数据。
// HashTree是JMeter执行测试依赖的数据结构,configure在执行测试之前进行配置测试数据
// 从HashTree中解析出TestPlan, 获取TestPlan的serialized和tearDownOnShutdown并保存为local属性,同时把整个HashTree也保存到local。
// StandardJMeterEngine依赖线程组ThreadGroup, 一个测试中可能会有多个线程组,如果serialized为true,则StandardJMeterEngine会串行的去执行这些线程组,每启动一个ThreadGroup主线程都会等它结束;否则就并行执行所有的线程组。
// tearDownOnShutdown与PostThreadGroup配合使用的,这个Special Thread Group专门用来做清理工作
@Override
public void configure(HashTree testTree) {
// Is testplan serialised?
SearchByClass<TestPlan> testPlan = new SearchByClass<>(TestPlan.class);
testTree.traverse(testPlan);
Object[] plan = testPlan.getSearchResults().toArray();
if (plan.length == 0) {
throw new IllegalStateException("Could not find the TestPlan class!");
}
TestPlan tp = (TestPlan) plan[0];
serialized = tp.isSerialized();
tearDownOnShutdown = tp.isTearDownOnShutdown();
active = true;
test = testTree;
}
exit
远程退出由 RemoteJMeterEngineImpl.rexit() 和notifyTestListenersOfEnd() 调用 iff exitAfterTest 为 true; 反过来,run( ) 方法调用,也调用 StopTest 类。
/**
* Remote exit
* Called by RemoteJMeterEngineImpl.rexit()
* and by notifyTestListenersOfEnd() iff exitAfterTest is true;
* in turn that is called by the run() method and the StopTest class
* also called
*
* 是为Remote Test准备的
* 如果当前的测试是从一个客户端的JMeter执行远程JMeterEngine的remote samples,则应该调用该exit()方法来关闭远程的测试
* 被RemoteJMeterEngineImpl.rexit()调用和exitAfterTest为真时被notifyTestListenersOfEnd()调用
*/
@Override
public void exit() {
ClientJMeterEngine.tidyRMI(log); // This should be enough to allow server to exit.
if (REMOTE_SYSTEM_EXIT) { // default is false
log.warn("About to run System.exit(0) on {}", host);
// Needs to be run in a separate thread to allow RMI call to return OK
Thread t = new Thread() {
@Override
public void run() {
pause(1000); // Allow RMI to complete
log.info("Bye from {}", host);
System.out.println("Bye from "+host); // NOSONAR Intentional
System.exit(0); // NOSONAR Intentional
}
};
t.start();
}
}
isActive
isActive 在测试中 JMeterEngine 返回值:
boolean 用于显示引擎是否处于活动状态的标志(在测试运行时为true)。在测试结束时设置为 false。
/**
* 引擎是否有效的标识,在测试结束时设为false
* 在confgiure()的时候设该值为true,在执行完测试(指的是该JMeterEngine所有ThreadGroup)之后设置为false。
* 如果active==true,则说明该JMeterEngine已经配置完测试并且还没执行完,我们不能再进行configure或者runTest了;
* 若active == false, 则该JMeterEngine是空闲的,我们可以重新配置HashTree,执行新的测试.
*
* @return
*/
@Override
public boolean isActive() {
return active;
}
engine
操作 engine,initSingletonEngine()、initSingletonEngine()、stopEngineNow()、stopEngine
/**
* Set the shared engine
* 操作 engine,initSingletonEngine()、initSingletonEngine()、stopEngineNow()、stopEngine
* @param standardJMeterEngine
*/
private static void initSingletonEngine(StandardJMeterEngine standardJMeterEngine) {
StandardJMeterEngine.engine = standardJMeterEngine;
}
public static void stopEngineNow() {
if (engine != null) {// May be null if called from Unit test
engine.stopTest(true);
}
}
public static void stopEngine() {
if (engine != null) { // May be null if called from Unit test
engine.stopTest(false);
}
}
run
run(),启动测试。
JMeterContextService 清零:numberOfActiveThreads=0, 重置 testStart时间
JMeterContextService.startTest();
JMeterContextService.startTest():
/**
* Method is called by the JMeterEngine class when a test run is started.
* Zeroes numberOfActiveThreads.
* Saves current time in a field and in the JMeter property "TESTSTART.MS"
*/
public static synchronized void startTest() {
if (testStart == 0) {
numberOfActiveThreads = 0;
testStart = System.currentTimeMillis();
JMeterUtils.setProperty("TESTSTART.MS",Long.toString(testStart));// $NON-NLS-1$
}
}
PreCompiler the Tashree,见上面的简要解读
try {
PreCompiler compiler = new PreCompiler();
test.traverse(compiler);
} catch (RuntimeException e) {
log.error("Error occurred compiling the tree:", e);
JMeterUtils.reportErrorToUser("Error occurred compiling the tree: - see log file", e);
return; // no point continuing
}
利用 SearchByClass 解析所有 TestStateListener 加入到 testList 中
SearchByClass<TestStateListener> testListeners = new SearchByClass<>(TestStateListener.class); // TL-S&E
test.traverse(testListeners);
// Merge in any additional test listeners
// currently only used by the function parser
testListeners.getSearchResults().addAll(testList);
- 触发上一步中解析的 testListener 的 testStarted 方法:ResultCollector 会递增 instanceCount,初始化 fileOutput;TestPlan 会设置 FileServer 的basedir,添加 classpath; JavaSampler 会初始化真正要跑的AbstractJavaSamplerClient 类;
- 利用 SearchByClass 解析所有 ThreadGroup(包括SetupThreadGroup,ThreadGroup, PostThreadGroup)
notifyTestListenersOfStart(testListeners);
private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
for (TestStateListener tl : testListeners.getSearchResults()) {
if (tl instanceof TestBean) {
TestBeanHelper.prepare((TestElement) tl);
}
if (host == null) {
tl.testStarted();
} else {
tl.testStarted(host);
}
}
}
实例化一个 ListenerNotifier 实例,用来通知事件发生。
ListenerNotifier notifier = new ListenerNotifier();
启动所有 SetupThreadGroup (一般情况下没有 SetupThreadGroup )并等待到都结束。
if (setupIter.hasNext()) {
log.info("Starting setUp thread groups");
while (running && setupIter.hasNext()) {// for each setup thread group
AbstractThreadGroup group = setupIter.next();
groupCount++;
String groupName = group.getName();
log.info("Starting setUp ThreadGroup: " + groupCount + " : " + groupName);
startThreadGroup(group, groupCount, setupSearcher, testLevelElements, notifier);
if (serialized && setupIter.hasNext()) {
log.info("Waiting for setup thread group: " + groupName
+ " to finish before starting next setup group");
group.waitThreadsStopped();
}
}
log.info("Waiting for all setup thread groups to exit");
// wait for all Setup Threads To Exit
waitThreadsStopped();
log.info("All Setup Threads have ended");
groupCount = 0;
JMeterContextService.clearTotalThreads();
}
进行一次 gc 后 开始跑真正的测试,即启动所有的 ThreadGroup,这里会检查 serialized 属性,用来判断是否这些 ThreadGroup 串行执行
JMeterUtils.helpGC();
等待所有的ThreadGroup结束
while (running && iter.hasNext()) {// for each thread group
AbstractThreadGroup group = iter.next();
// ignore Setup and Post here. We could have filtered the searcher.
// but then
// future Thread Group objects wouldn't execute.
if (group instanceof SetupThreadGroup || group instanceof PostThreadGroup) {
continue;
}
groupCount++;
String groupName = group.getName();
log.info("Starting ThreadGroup: " + groupCount + " : " + groupName);
startThreadGroup(group, groupCount, searcher, testLevelElements, notifier);
if (serialized && iter.hasNext()) {
log.info("Waiting for thread group: " + groupName + " to finish before starting next group");
group.waitThreadsStopped();
}
} // end of thread groups
if (groupCount == 0) { // No TGs found
log.info("No enabled thread groups found");
} else {
if (running) {
log.info("All thread groups have been started");
} else {
log.info("Test stopped - no more thread groups will be started");
}
}
// wait for all Test Threads To Exit
waitThreadsStopped();
若有 PostThreadGroup(一般没有),执行所有的 PostThreadGroup 并等待至所有 PostThreadGroup 结束。
if (postIter.hasNext()) {
groupCount = 0;
JMeterContextService.clearTotalThreads();
log.info("Starting tearDown thread groups");
if (mainGroups && !running) { // i.e. shutdown/stopped during main
// thread groups
running = shutdown && tearDownOnShutdown; // re-enable for
// tearDown if
// necessary
}
while (running && postIter.hasNext()) {// for each setup thread
// group
AbstractThreadGroup group = postIter.next();
groupCount++;
String groupName = group.getName();
log.info("Starting tearDown ThreadGroup: " + groupCount + " : " + groupName);
startThreadGroup(group, groupCount, postSearcher, testLevelElements, notifier);
if (serialized && postIter.hasNext()) {
log.info("Waiting for post thread group: " + groupName
+ " to finish before starting next post group");
group.waitThreadsStopped();
}
}
waitThreadsStopped(); // wait for Post threads to stop
}
触发第三步中解析的 testListener 的 testEnded 方法:JavaSampler 会调用真正跑的 AbstractJavaSamplerClient 的 teardownTest 方法,可以打印该 JavaSamplerClient 测试总共花费的时间;
- ResultCollector 用来将测试结果写如文件生成;
- reportTestPlan 用来关闭文件。
notifyTestListenersOfEnd(testListeners);
JMeterContextService.endTest();
startThreadGroup
启动线程组,run 方法中调用
private void startThreadGroup(AbstractThreadGroup group, int groupCount, SearchByClass<?> searcher,
List<?> testLevelElements, ListenerNotifier notifier) {
try {
int numThreads = group.getNumThreads();
JMeterContextService.addTotalThreads(numThreads);
boolean onErrorStopTest = group.getOnErrorStopTest();
boolean onErrorStopTestNow = group.getOnErrorStopTestNow();
boolean onErrorStopThread = group.getOnErrorStopThread();
boolean onErrorStartNextLoop = group.getOnErrorStartNextLoop();
String groupName = group.getName();
log.info("Starting " + numThreads + " threads for group " + groupName + ".");
if (onErrorStopTest) {
log.info("Test will stop on error");
} else if (onErrorStopTestNow) {
log.info("Test will stop abruptly on error");
} else if (onErrorStopThread) {
log.info("Thread will stop on error");
} else if (onErrorStartNextLoop) {
log.info("Thread will start next loop on error");
} else {
log.info("Thread will continue on error");
}
ListedHashTree threadGroupTree = (ListedHashTree) searcher.getSubTree(group);
threadGroupTree.add(group, testLevelElements);
groups.add(group);
group.start(groupCount, notifier, threadGroupTree, this);
} catch (JMeterStopTestException ex) { // NOSONAR Reported by log
JMeterUtils.reportErrorToUser("Error occurred starting thread group :" + group.getName()
+ ", error message:" + ex.getMessage() + ", \r\nsee log file for more details", ex);
return; // no point continuing
}
}
waitThreadsStopped
等待线程停止,run 方法中调用
/**
* Wait for Group Threads to stop
*/
private void waitThreadsStopped() {
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : groups) {
threadGroup.waitThreadsStopped();
}
}
/**
* Wait for all Group Threads to stop
*/
@Override
public void waitThreadsStopped() {
if (delayedStartup) {
waitThreadStopped(threadStarter);
}
for (Thread t : allThreads.values()) {
waitThreadStopped(t);
}
}
/**
* Wait for thread to stop
* @param thread Thread
*/
private void waitThreadStopped(Thread thread) {
if (thread != null) {
while (thread.isAlive()) {
try {
thread.join(WAIT_TO_DIE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
removeThreadGroups
移除线程组,在 run 方法里调用
private void removeThreadGroups(List<?> elements) {
Iterator<?> iter = elements.iterator();
while (iter.hasNext()) { // Can't use for loop here because we remove elements
Object item = iter.next();
if (item instanceof AbstractThreadGroup || !(item instanceof TestElement)) {
iter.remove();
}
}
}
runTest
runTest( ),调用该方法用来执行测试,启动一个线程并触发它的run()方法,若报异常则调用stopTest(),抛出 JMeterEngineException。
// 调用该方法用来执行测试,启动一个线程并触发它的run()方法,若报异常则调用stopTest(),抛出JMeterEngineException
@Override
public void runTest() throws JMeterEngineException {
if (host != null){
long now=System.currentTimeMillis();
System.out.println("Starting the test on host " + host + " @ "+new Date(now)+" ("+now+")"); // NOSONAR Intentional
}
try {
Thread runningThread = new Thread(this, "StandardJMeterEngine");
// 启动一个线程并触发它的run()方法
runningThread.start();
} catch (Exception err) {
stopTest();
throw new JMeterEngineException(err);
}
}
stopThread
根据 threadName 停止线程的执行:分两种情况立即停止和非立即停止,根据第二个参数的值决定
//根据threadName停止线程的执行:分两种情况立即停止和非立即停止,根据第二个参数的值决定
public static boolean stopThread(String threadName) {
return stopThread(threadName, false);
}
public static boolean stopThreadNow(String threadName) {
return stopThread(threadName, true);
}
private static boolean stopThread(String threadName, boolean now) {
if (engine == null) {
return false;// e.g. not yet started
}
boolean wasStopped = false;
// ConcurrentHashMap does not need synch. here
for (AbstractThreadGroup threadGroup : engine.groups) {
wasStopped = wasStopped || threadGroup.stopThread(threadName, now);
}
return wasStopped;
}
ThreadGroup.stopThread调用及具体实现代码如下:
/**
* Stop thread called threadName:
* <ol>
* <li>stop JMeter thread</li>
* <li>interrupt JMeter thread</li>
* <li>interrupt underlying thread</li>
* </ol>
* @param threadName String thread name
* @param now boolean for stop
* @return true if thread stopped
*/
@Override
public boolean stopThread(String threadName, boolean now) {
for (Entry<JMeterThread, Thread> threadEntry : allThreads.entrySet()) {
JMeterThread jMeterThread = threadEntry.getKey();
if (jMeterThread.getThreadName().equals(threadName)) {
stopThread(jMeterThread, threadEntry.getValue(), now);
return true;
}
}
return false;
}
/**
* Hard Stop JMeterThread thrd and interrupt JVM Thread if interrupt is true
* @param jmeterThread {@link JMeterThread}
* @param jvmThread {@link Thread}
* @param interrupt Interrupt thread or not
*/
private void stopThread(JMeterThread jmeterThread, Thread jvmThread, boolean interrupt) {
jmeterThread.stop();
jmeterThread.interrupt(); // interrupt sampler if possible
if (interrupt && jvmThread != null) { // Bug 49734
jvmThread.interrupt(); // also interrupt JVM thread
}
}
stopTest
stopTest(boolean now)
测试,若 now 为 true 则停止动作立即执行;若为 false 则停止动作缓刑,它会等待当前正在执行的测试至少执行完一个 iteration。
// 停止测试,若now为true则停止动作立即执行;若为false则停止动作缓刑,它会等待当前正在执行的测试至少执行完一个iteration。
@Override
public synchronized void stopTest(boolean now) {
Thread stopThread = new Thread(new StopTest(now));
stopThread.start();
}
stopTest()
立即停止执行测试。
/**
* Stop Test Now
*/
@Override
public synchronized void stopTest() {
stopTest(true);
}
notifyTestListenersOfStart
测试开始通知监听。
private void notifyTestListenersOfStart(SearchByClass<TestStateListener> testListeners) {
for (TestStateListener tl : testListeners.getSearchResults()) {
if (tl instanceof TestBean) {
TestBeanHelper.prepare((TestElement) tl);
}
if (host == null) {
tl.testStarted();
} else {
tl.testStarted(host);
}
}
}
介绍本方法需要了解下 TestStateListener 接口。
package org.apache.jmeter.testelement;
/**
* @since 2.8
*/
public interface TestStateListener {
/**
* <p>
* Called just before the start of the test from the main engine thread.
*
* This is before the test elements are cloned.
*
* Note that not all the test
* variables will have been set up at this point.
* </p>
*
* <p>
* <b>
* N.B. testStarted() and testEnded() are called from different threads.
* </b>
* </p>
* @see org.apache.jmeter.engine.StandardJMeterEngine#run()
*
*/
void testStarted();
/**
* <p>
* Called just before the start of the test from the main engine thread.
*
* This is before the test elements are cloned.
*
* Note that not all the test
* variables will have been set up at this point.
* </p>
*
* <p>
* <b>
* N.B. testStarted() and testEnded() are called from different threads.
* </b>
* </p>
* @see org.apache.jmeter.engine.StandardJMeterEngine#run()
* @param host name of host
*/
void testStarted(String host);
/**
* <p>
* Called once for all threads after the end of a test.
*
* This will use the same element instances as at the start of the test.
* </p>
*
* <p>
* <b>
* N.B. testStarted() and testEnded() are called from different threads.
* </b>
* </p>
* @see org.apache.jmeter.engine.StandardJMeterEngine#stopTest()
*
*/
void testEnded();
/**
* <p>
* Called once for all threads after the end of a test.
*
* This will use the same element instances as at the start of the test.
* </p>
*
* <p>
* <b>
* N.B. testStarted() and testEnded() are called from different threads.
* </b>
* </p>
* @see org.apache.jmeter.engine.StandardJMeterEngine#stopTest()
* @param host name of host
*
*/
void testEnded(String host);
}
testStarted:在测试开始之前调用
testEnded:在所有线程测试结束时调用一次
notifyTestListenersOfEnd
测试结束通知监听
private void notifyTestListenersOfEnd(SearchByClass<TestStateListener> testListeners) {
log.info("Notifying test listeners of end of test");
for (TestStateListener tl : testListeners.getSearchResults()) {
try {
if (host == null) {
tl.testEnded();
} else {
tl.testEnded(host);
}
} catch (Exception e) {
log.warn("Error encountered during shutdown of "+tl.toString(),e);
}
}
if (host != null) {
log.info("Test has ended on host {} ", host);
long now=System.currentTimeMillis();
System.out.println("Finished the test on host " + host + " @ "+new Date(now)+" ("+now+")" // NOSONAR Intentional
+(EXIT_AFTER_TEST ? " - exit requested." : ""));
if (EXIT_AFTER_TEST){
exit();
}
}
active=false;
}
单机执行
// 加载jmx文件
FileServer.getFileServer().setBaseForScript(jmxFile);
// 设置jmx脚本文件的工作目录
HashTree jmxTree = SaveService.loadTree(jmxFile);
// 去掉没用的节点元素,替换掉可以替换的控制器
JMeter.convertSubTree(jmxTree);
// 初始化默认的压测引擎
JMeterEngine engine = new StandardJMeterEngine();
engine.configure(jmxTree);
engine.runTest();
分布式执行
// 分布式执行脚本,StringTokenizer是为了初始化hosts参数
// DistributedRunner本质上还是StandardJMeterEngine来执行的压测,使用的是rmi的协议实现的分布式压测。
java.util.StringTokenizer st = new java.util.StringTokenizer(remoteHostsString, ",");//$NON-NLS-1$
List<String> hosts = new LinkedList<>();
while (st.hasMoreElements()) {
hosts.add((String) st.nextElement());
}
DistributedRunner distributedRunner=new DistributedRunner(this.remoteProps);
distributedRunner.setStdout(System.out); // NOSONAR
distributedRunner.setStdErr(System.err); // NOSONAR
distributedRunner.init(hosts, clonedTree);
engines.addAll(distributedRunner.getEngines());
distributedRunner.start();
StringTokenizer 是为了初始化hosts参数使用的。
DistributedRunner 本质上还是 StandardJMeterEngine 来执行的压测,使用的是 rmi 的协议实现的分布式压测。
- 点赞
- 收藏
- 关注作者
评论(0)