QPS(Queries Per Second)即"每秒查询率",是衡量系统性能和处理能力的重要指标。它表示系统在单位时间内能够处理的请求数量,是评估Web服务器、数据库、API服务等性能的关键参数。
QPS = 请求总数 / 时间间隔(秒)
最简单的QPS统计方法是使用计数器:
import time
from collections import defaultdict
class SimpleQPSCounter:
def __init__(self):
self.count = 0
self.start_time = time.time()
def increment(self):
self.count += 1
def get_qps(self):
current_time = time.time()
elapsed = current_time - self.start_time
return self.count / elapsed if elapsed > 0 else 0
def reset(self):
self.count = 0
self.start_time = time.time()
# 使用示例
counter = SimpleQPSCounter()
# 模拟请求
for _ in range(1000):
counter.increment()
time.sleep(0.001) # 模拟请求处理
print(f"当前QPS: {counter.get_qps():.2f}")
简单计数法只能计算平均QPS,无法反映最近时间段的真实流量。滑动窗口算法可以解决这个问题:
import time
from collections import deque
class SlidingWindowQPSCounter:
def __init__(self, window_size=60):
self.window_size = window_size # 窗口大小(秒)
self.requests = deque()
def record_request(self):
current_time = time.time()
self.requests.append(current_time)
self._clean_old_requests(current_time)
def _clean_old_requests(self, current_time):
# 移除超出时间窗口的请求记录
while self.requests and self.requests[0] < current_time - self.window_size:
self.requests.popleft()
def get_current_qps(self):
current_time = time.time()
self._clean_old_requests(current_time)
return len(self.requests) / self.window_size
# 使用示例
window_counter = SlidingWindowQPSCounter(window_size=10) # 10秒窗口
# 模拟请求波动
for i in range(200):
window_counter.record_request()
if i % 50 == 0:
time.sleep(1) # 模拟请求波动
print(f"当前QPS: {window_counter.get_current_qps():.2f}")
对于高并发系统,可以使用分段统计来减少锁竞争:
import time
import threading
from atomic import AtomicLong # 需要安装atomic包
class SegmentQPSCounter:
def __init__(self, segments=10, window_size=60):
self.segments = segments
self.window_size = window_size
self.segment_duration = window_size / segments
self.counts = [AtomicLong(0) for _ in range(segments)]
self.current_segment = 0
self.last_update_time = time.time()
self.lock = threading.Lock()
def increment(self):
with self.lock:
current_time = time.time()
self._advance_segment_if_needed(current_time)
self.counts[self.current_segment].increment()
def _advance_segment_if_needed(self, current_time):
time_elapsed = current_time - self.last_update_time
segments_to_advance = min(
int(time_elapsed / self.segment_duration),
self.segments
)
if segments_to_advance > 0:
# 清空过期的段
for i in range(segments_to_advance):
segment_index = (self.current_segment + i + 1) % self.segments
self.counts[segment_index].set(0)
self.current_segment = (self.current_segment + segments_to_advance) % self.segments
self.last_update_time = current_time
def get_qps(self):
with self.lock:
current_time = time.time()
self._advance_segment_if_needed(current_time)
total = 0
for i in range(self.segments):
total += self.counts[i].get()
return total / self.window_size
在分布式环境中,需要聚合多个节点的数据:
# 分布式QPS统计简化示例
class DistributedQPSCounter:
def __init__(self, redis_client, key_prefix="qps:"):
self.redis = redis_client
self.key_prefix = key_prefix
def increment(self, endpoint):
current_minute = int(time.time() / 60) # 按分钟聚合
key = f"{self.key_prefix}{endpoint}:{current_minute}"
self.redis.incr(key)
self.redis.expire(key, 120) # 设置过期时间
def get_qps(self, endpoint):
current_minute = int(time.time() / 60)
previous_minute = current_minute - 1
# 获取最近两分钟的数据
keys = [
f"{self.key_prefix}{endpoint}:{previous_minute}",
f"{self.key_prefix}{endpoint}:{current_minute}"
]
counts = self.redis.mget(keys)
total = sum(int(count) if count else 0 for count in counts)
# 计算加权QPS(考虑当前分钟的时间进度)
current_second = time.time() % 60
weight = (60 + current_second) / 120 # 加权计算
return total * weight / 60
QPS统计是系统监控的重要组成部分,选择合适的统计方法对于准确评估系统性能至关重要。从简单的计数器到复杂的滑动窗口算法,每种方法都有其适用场景。在实际应用中,需要根据系统特点、性能要求和资源约束选择最合适的QPS统计方案。
希望本文对您理解和实现QPS统计有所帮助!