大模型已在智能客服、代码生成、金融分析等领域爆发式落地,但实际场景的实时交互、高并发需求(如千人同时提问)暴露了性能瓶颈:用户可能因3秒延迟流失,而盲目优化响应速度又会导致GPU资源暴增。
性能测试的核心目标,正是通过量化模型在准确性(如回答正确率)、响应速度(如首Token延迟)和资源消耗(如显存占用)的三角博弈,找到业务场景的最优平衡点。
EvalScope 是由魔搭社区官方推出的全栈评测框架,支持 LLM、多模态模型及 RAG 系统的端到端评测。选择 EvalScope 进行 LLM 性能测试的核心优势在于其 全栈评测能力 与 工业级性能优化设计。
EvalScope 支持 延迟(如首 Token 延迟/TTFT)、吞吐量(每秒生成 Token 数/TGS)、GPU 利用率 等关键指标的高精度测量 。其内置的 Locust 压测引擎 可模拟千人并发请求,并支持动态调整输入 Token 长度分布(如设置均值±标准差),真实还原业务流量波动场景 。
工具采用 模型适配器 与 数据适配器 分离设计,兼容本地模型(如 Qwen、DeepSeek)和 API 服务(如 OpenAI、Anthropic),并支持分布式压测框架(如 Ray) 。通过与 ms-swift 训练框架 的无缝集成,开发者可在训练后直接发起性能评测,形成“训练-评测-优化”闭环
EvalScope 工具的运行需要 Python 运行环境,建议使用 Conda 进行虚拟环境管理。 EvalScope 安装:方式1.使用pip安装 中介绍了如何使用 conda 来创建 evalscope 的运行环境, 这里不在赘述。
如果 LLM 接口的格式是标准的 OpenAI 格式,则EvalScope通过简单的命令或者脚本就可以快速测试。此处简易使用 Python 脚本来进行性能测试(便于维护)。
from evalscope.perf.main import run_perf_benchmark
import os
task_cfg = {"url": "https://api.deepseek.com/chat/completions",
"api_key": os.getenv("DEEPSEEK_API_KEY"),
"parallel": 5,
"model": "deepseek-chat",
"number": 20,
"api": "openai",
"dataset": "openqa",
"stream": True}
run_perf_benchmark(task_cfg)
关于参数的含义,可以查阅参数说明 因为我们是标准的 OpenAI 格式的对接接口,因此 api 字段指定为openai "dataset": "openqa"表示使用了 openqa 的数据集。 它使用jsonl文件的 question 字段作为prompt。不指定dataset_path将从modelscope自动下载数据集,prompt长度较短,一般在 100 token 以下。
(evalscope) (base) [windealli@VM-52-29-tencentos llm-benchmark]$ python evalscope/deepseek_perf.py
如果LLM接口的输入输出格式非标准的 OpenAPI 格式,则需要开发请求构造和应答解析逻辑。 参考: 自定义请求 API 这里其实也不复杂,就是继承 ApiPluginBase 类,并使用 @register_api("api名称") 进行注解,实现如下两个方法
import os
from evalscope.perf.main import run_perf_benchmark
import sbsagent.custom
query_template = """{
"query_id": "{query_id}",
"agent_session_id": "{agent_session_id}",
"query": "%p" ,
"forward_service": "%m",
"stream": true
}
"""
task_cfg = {
"url": "https://127.0.0.1:8080/v1/chat/complete",
"api_key": os.getenv("SSV_AI_GW_API_KEY"),
...
"api": "sbs-agent-api",
"query_template": query_template,
"no_test_connection": True,
}
run_perf_benchmark(task_cfg)
import sbsagent.custom 是自定义服务的模块路径 "query_template": query_template, 指定了接口的请求模板 "api": "sbs-agent-api", 指定了要使用的自定义 API 服务,需要下文的自定义 API 的注解保持一致。 这里还额外指定了参数:"no_test_connection": True, 主要是为了简化测试流程,否则测试连接时可能会无法正常启动测试,需要额外处理。 改参数不影响测试结果
# import os
import json
import time
from abc import abstractmethod
from typing import Any, Dict, List, Tuple
from evalscope.perf.arguments import Arguments
from evalscope.perf.plugin.api.base import ApiPluginBase
from evalscope.perf.plugin.registry import register_api
from evalscope.utils.logger import get_logger
logger = get_logger()
# from evalscope.perf.plugin.registry import register_api
@register_api('sbs-agent-api')
class CustomPlugin(ApiPluginBase):
def __init__(self, model_path: str) -> None:
print("CustomPlugin initialized.=============")
self.model_path = model_path
self.invalid_output_count = 0
self.request_count = 0
if self.model_path is not None:
from modelscope import AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
else:
self.tokenizer = None
@abstractmethod
def build_request(self, messages: List[Dict], param: Arguments) -> Dict:
"""Build a api request body.
Args:
messages (List[Dict]): The messages generated by dataset.
param (QueryParameters): The query parameters.
Raises:
NotImplementedError: Not implemented.
Returns:
Dict: The api request body.
"""
# print(messages, param)
print("======message=====")
print(messages)
# print("======param=====")
# print(param)
try:
query = json.loads(param.query_template)
if not isinstance(query, dict):
raise ValueError('Prompt must be a dict.')
ApiPluginBase.replace_values(query, param.model, messages)
#将query中的{query_id}替换成当前时间戳
# query['query_id'] = time.strftime("%Y%m%d%H%M%S", time.localtime())
query['query_id'] = "query_id-" + str(self.request_count % 100)
print("======", query)
return query
except Exception as e:
logger.exception(e)
logger.error('Prompt: %s invalidate!' % messages)
return None
# raise NotImplementedError
@abstractmethod
def parse_responses(self,
responses: List,
request: Any=None,
**kwargs:Any) -> Tuple[int, int]:
"""Parser responses and return number of request and response tokens.
Args:
responses (List[bytes]): List of http response body, for stream output,
there are multiple responses, each is bytes, for general only one.
request (Any): The request body.
Returns:
Tuple: (Number of prompt_tokens and number of completion_tokens).
"""
full_response_content = ''
delta_contents = {}
input_tokens = None
output_tokens = None
# Parse responses to extract and concatenate data content
# parsed_responses_result = []
full_response_result = ''
for response in responses:
try:
# Decode the response and split by lines
lines = response.decode('utf-8').splitlines()
for line in lines:
if line.startswith("data:"):
# Extract the JSON part after "data:" and parse it
data = json.loads(line[5:].strip())
if "retcode" not in data or data["retcode"] != 0:
raise Exception(f"Error code {data['retcode']}: {data['message']}")
if "result" in data:
full_response_result = full_response_result + data["result"]
# parsed_responses.append(data)
if "usage" in data:
if "prompt_tokens" in data["usage"]:
input_tokens = input_tokens + data["usage"]["prompt_tokens"]
if "completion_tokens" in data["usage"]:
output_tokens = output_tokens + data["usage"]["completion_tokens"]
except Exception as e:
logger.exception(f"Failed to parse response: {response}")
if input_tokens is None or output_tokens is None:
logger.warning('No usage info get.')
if self.tokenizer is not None:
input_tokens = len(self.tokenizer.encode(request["query"]))
output_tokens = len(self.tokenizer.encode(full_response_result))
else:
input_tokens = len(request["query"])
output_tokens = len(full_response_result)
return input_tokens, output_tokens
print("Parsed Responses:", parsed_responses)
# return parsed_responses
# 解析响应
# Automatically initialize the module when imported
def _initialize_module():
print("Initializing CustomPlugin module...")
# Perform any necessary setup or registration here
_initialize_module()
同样运行 perf.py,几个进行性能测试。
指标 | 英文名称 | 解释 | 公式 |
---|---|---|---|
测试总时长 | Time taken for tests | 整个测试过程从开始到结束所花费的总时间 | 最后一个请求结束时间 - 第一个请求开始时间 |
并发数 | Number of concurrency | 同时发送请求的客户端数量 | 预设值 |
总请求数 | Total requests | 在整个测试过程中发送的所有请求的数量 | 成功请求数 + 失败请求数 |
成功请求数 | Succeed requests | 成功完成并返回预期结果的请求数量 | 直接统计 |
失败请求数 | Failed requests | 由于各种原因未能成功完成的请求数量 | 直接统计 |
输出吞吐量 | Output token throughput | 每秒钟处理的平均标记(token)数 | 总输出token数 / 测试总时长 |
总吞吐量 | Total token throughput | 每秒钟处理的平均标记(token)数 | 总输入token数 + 总输出token数 / 测试总时长 |
请求吞吐量 | Request throughput | 每秒钟成功处理的平均请求数 | 成功请求数 / 测试总时长 |
总延迟时间 | Total latency | 所有成功请求的延迟时间总和 | 所有成功请求的延迟时间之和 |
平均延迟 | Average latency | 从发送请求到接收完整响应的平均时间 | 总延迟时间 / 成功请求数 |
平均首token时间 | Average time to first token | 从发送请求到接收到第一个响应标记的平均时间 | 总首chunk延迟 / 成功请求数 |
平均每输出token时间 | Average time per output token | 生成每个输出标记所需的平均时间(不包含首token) | 总每输出token时间 / 成功请求数 |
平均输入token数 | Average input tokens per request | 每个请求的平均输入标记数 | 总输入token数 / 成功请求数 |
平均输出token数 | Average output tokens per request | 每个请求的平均输出标记数 | 总输出token数 / 成功请求数 |
平均数据包延迟 | Average package latency | 接收每个数据包的平均延迟时间 | 总数据包时间 / 总数据包数 |
平均每请求数据包数 | Average package per request | 每个请求平均接收的数据包数量 | 总数据包数 / 成功请求数 |
百分位指标 (Percentile)
以单个请求为单位进行统计,数据被分为100个相等部分,第n百分位表示n%的数据点在此值之下。
指标 | 英文名称 | 解释 |
---|---|---|
首次生成token时间 | TTFT (Time to First Token) | 从发送请求到生成第一个token的时间(以秒为单位),评估首包延时 |
输出token间时延 | ITL (Inter-token Latency) | 生成每个输出token间隔时间(以秒为单位),评估输出是否平稳 |
每token延迟 | TPOT (Time per Output Token) | 生成每个输出token所需的时间(不包含首token,以秒为单位),评估解码速度 |
端到端延迟时间 | Latency | 从发送请求到接收完整响应的时间(以秒为单位):TTFT + TPOT * Output tokens |
输入token数 | Input tokens | 请求中输入的token数量 |
输出token数 | Output tokens | 响应中生成的token数量 |
输出吞吐量 | Output Throughput | 每秒输出的token数量:输出tokens / 端到端延时 |
总吞吐量 | Total throughput | 每秒处理的token数量:(输入tokens + 输出tokens) / 端到端延时 |
在性能压测结束后,处理查阅测试指标外, 我们可能还想查阅 LLM 的对话内容。 EvalScope 在测试时会将对话内容存储在./outputs路径下以 sqlite3 数据库的形式存储。 可以转到 sqlite3 查看数据,可以编写脚本将其解析到 csv文件中,参考脚本:
import sqlite3
import base64
import pickle
import json
import xlsxwriter
result_db_path = 'outputs/20250425_142407/xxxx/benchmark_data.db'
result_xlws_path = 'outputs/20250425_142407/xxxx/benchmark_results.xlsx'
con = sqlite3.connect(result_db_path)
query_sql = "SELECT request, response_messages, prompt_tokens, completion_tokens \
FROM result WHERE success='1'"
# how to save base64.b64encode(pickle.dumps(benchmark_data["request"])).decode("ascii"),
with con:
# Initialize Excel workbook and worksheet
workbook = xlsxwriter.Workbook(result_xlws_path)
worksheet = workbook.add_worksheet()
# Write headers
headers = ['Prompt', 'Prompt Tokens', 'Completion', 'Completion Tokens']
for col_num, header in enumerate(headers):
worksheet.write(0, col_num, header)
row_num = 1
rows = con.execute(query_sql).fetchall()
if len(rows) > 0:
for row in rows:
request = row[0]
responses = row[1]
request = base64.b64decode(request)
request = pickle.loads(request)
responses = base64.b64decode(responses)
responses = pickle.loads(responses)
print('request: %s' % request)
print('responses: %s' % responses)
response_content = ''
for response in responses:
print(response)
response = json.loads(response)
# if not response['result']:
# print(response)
# continue
if 'content' not in response['choices'][0]['delta']:
print(response)
continue
# response_content += response['result']
response_content += response['choices'][0]['delta']['content']
print('prompt: %s, tokens: %s, completion: %s, tokens: %s' %
(request['query'], row[2], response_content,
row[3]))
worksheet.write(row_num, 0, request['query'])
worksheet.write(row_num, 1, row[2]) # Prompt tokens
worksheet.write(row_num, 2, response_content)
worksheet.write(row_num, 3, row[3]) # Completion tokens
row_num += 1
# Close the workbook
workbook.close()
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有