现在压测系统一直用的方案是goreplay进行二次开发完成的。因为整体是Java技术栈的,使用goreplay有存在两方面问题:一是兼容性,语言和开发框架上,增加了用例创建执行的复杂度;二是维护成本,goreplay二次开发方案已经无法满足现在的性能测试需求。如果维护两套压测引擎会带来更多工作量。
所以为了尽可能解决这两方面问题,接到了一个活儿,调研一下Java实现日志回放功能。主要就是读了goreplay的源码以及它设计思路,用Java重现实现一遍。
这里用到了前两天分享的Disruptor高性能队列常用API演示、高性能队列Disruptor在测试中应用,有兴趣的可以再翻一翻。另视频版还在制作中,年后会和大家相见。
总体设计思路如下:
千万级日志回放设计
PS:流量递增和动态增减尚未实现,还在研究goreplay的源码。
日志的拉取和初步解析依旧采取原来项目中的逻辑,通过SQL语句网关日志中拉取日志,并对日志内容进行初步解析,放入云OSS中,并将链接存入数据库(此步骤放在录制流量成功之后)。
PS:目前日志解析保留的有用信息只有URL
日志格式如下:
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
/v1/level,funtester.com,-,token,-,1622611469,-
PS:这些风险后续会逐个解决。
def ft = {
output("创建线程")
fun {
int i = 0
while (key) {
def url = logs.get(i % logs.size())
def get = getHttpGet(HOST + url)
get.addHeader("token", tokens.get(i % tokens.size()))
get.addHeader(HttpClientConstant.USER_AGENT)
ringBuffer.publishEvent {e, s ->
e.setRequest(get)
}
i++
}
}
}
ft()
/**
* 通过闭包传入方法读取超大文件部分内容
*
* @param filePath
* @param function
* @return
*/
public static List<String> readByLine(String filePath, Function<String, String> function) {
if (StringUtils.isEmpty(filePath) || !new File(filePath).exists() || new File(filePath).isDirectory())
ParamException.fail("文件信息错误!" + filePath);
logger.debug("读取文件名:{}", filePath);
List<String> lines = new ArrayList<>();
File file = new File(filePath);
if (file.isFile() && file.exists()) { // 判断文件是否存在
try (FileInputStream fileInputStream = new FileInputStream(file);
InputStreamReader read = new InputStreamReader(fileInputStream, DEFAULT_CHARSET);
BufferedReader bufferedReader = new BufferedReader(read, 3 * 1024 * 1024);) {
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String apply = function.apply(line);
if (StringUtils.isNotBlank(apply)) lines.add(apply);
}
} catch (Exception e) {
logger.warn("读取文件内容出错", e);
}
} else {
logger.warn("找不到指定的文件:{}", filePath);
}
return lines;
}
package com.funtest.groovytest
import com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.httpclient.ClientManage
import com.funtester.httpclient.FunLibrary
import com.funtester.utils.ArgsUtil
import com.funtester.utils.RWUtil
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase
import org.junit.platform.commons.util.StringUtils
import java.util.concurrent.LinkedBlockingDeque
import java.util.function.Function
class ReplayTest extends FunLibrary {
static String url = "http://localhost:12345/test";
static HttpGet httpGet = getHttpGet(url);
// static LinkedBlockingQueue<HttpRequestBase> requests = new LinkedBlockingQueue<>()
static def HOST = "http://localhost:12345"
static def key = true
static Disruptor<RequestEvent> disruptor
public static void main(String[] args) {
def logfile = "/Users/oker/Desktop/log.csv"
// def logfile = "/Users/oker/Desktop/fun.csv"
//1千万日志
def tokenfile = "/Users/oker/Desktop/token.csv"
//2万用户token
List<String> logs = RWUtil.readByLine(logfile, new Function<String, String>() {
@Override
String apply(String s) {
return StringUtils.isNotBlank(s) && s.startsWith("/") ? s.split(COMMA)[0] : null
}
});
List<String> tokens = RWUtil.readByLine(tokenfile, new Function<String, String>() {
@Override
String apply(String s) {
return StringUtils.isNotBlank(s) ? s.split(COMMA)[4] : null
}
});
output("总计 ${formatLong(logs.size())} 条日志")
disruptor = new Disruptor<RequestEvent>(
RequestEvent::new,
512 * 512,
ThreadPoolUtil.getFactory(),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
RingBuffer<RequestEvent> ringBuffer = disruptor.getRingBuffer();
def ft = {
output("创建线程")
fun {
int i = 0
while (key) {
def url = logs.get(i % logs.size())
def get = getHttpGet(HOST + url)
get.addHeader("token", tokens.get(i % tokens.size()))
get.addHeader(HttpClientConstant.USER_AGENT)
ringBuffer.publishEvent {e, s ->
e.setRequest(get)
}
i++
}
}
}
ft()
disruptor.handleEventsWith(new FunTester(10))
// 5.times {ft()}
//下面开始测试
ClientManage.init(10, 5, 0, "", 0)
def util = new ArgsUtil(args)
def thread = util.getIntOrdefault(0, 20)
def times = util.getIntOrdefault(1, 60000)
RUNUP_TIME = util.getIntOrdefault(2, 0)
def tasks = []
thread.times {
def tester = new FunTester(times)
disruptor.handleEventsWith(tester);
tasks << tester
}
disruptor.start();
new Concurrent(tasks, "这是千万级日志回放演示Demo").start()
}
private static class FunTester extends FixedThread implements EventHandler<RequestEvent>, WorkHandler<RequestEvent> {
LinkedBlockingDeque<HttpRequestBase> reqs = new LinkedBlockingDeque<HttpRequestBase>()
FunTester(int limit) {
super(null, limit, true)
}
@Override
protected void doing() throws Exception {
FunLibrary.executeOnly(reqs.take())
}
@Override
FixedThread clone() {
return new FunTester(limit)
}
@Override
protected void after() {
super.after()
key = false
disruptor.shutdown()
}
@Override
void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
if (reqs.size() < 100000) reqs.add(event.getRequest())
}
@Override
void onEvent(RequestEvent event) throws Exception {
if (reqs.size() < 100000) reqs.add(event.getRequest())
}
}
private static class RequestEvent {
HttpRequestBase request;
public HttpRequestBase getRequest() {
return request;
}
public void setRequest(HttpRequestBase request) {
this.request = request;
}
}
}
PS:这里用到了多个group,原因在设计稿中标记了。