本篇将重点关注 Sentienl 实时数据收集,即 Sentienl 具体是如何收集调用信息,以此来判断是否需要触发限流或熔断。
Sentienl 实时数据收集的入口类为 StatisticSlot。
我们先简单来看一下 StatisticSlot 该类的注释,来看一下该类的整体定位。
StatisticSlot,专用于实时统计的 slot。在进入一个资源时,在执行 Sentienl 的处理链条中会进入到该 slot 中,需要完成如下计算任务:
接下来用源码分析的手段来详细分析 StatisticSlot 的实现原理。
StatisticSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args); // @1
// Request passed, add thread count and pass count.
node.increaseThreadNum(); // @2
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) { // @3
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) { // @4
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { // @5
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) { // @6
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) { // @7
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) { // @8
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
代码@1:首先调用 fireEntry,先调用 Sentinel Slot Chain 中其他的处理器,执行完其他处理器的逻辑,例如 FlowSlot、DegradeSlot,因为 StatisticSlot 的职责是收集统计信息。
代码@2:如果后续处理器成功执行,则将正在执行线程数统计指标加一,并将通过的请求数量指标增加对应的值。下文会对 Sentinel Node 体系进行详细的介绍,在 Sentinel 中使用 Node 来表示调用链中的某一个节点,每个节点关联一个资源,资源的实时统计信息就存储在 Node 中,故该部分也是调用 DefaultNode 的相关方法来改变线程数等,将在下文会向详细介绍。
代码@3:如果上下文环境中保存了调用的源头(调用方)的节点信息不为空,则更新该节点的统计数据:线程数与通过数量。
代码@4:如果资源的进入类型为 EntryType.IN,表示入站流量,更新入站全局统计数据(集群范围 ClusterNode)。
代码@5:执行注册的进入Handler,可以通过 StatisticSlotCallbackRegistry 的 addEntryCallback 注册相关监听器。
代码@6:如果捕获到 PriorityWaitException ,则认为是等待过一定时间,但最终还是算通过,只需增加线程的个数,但无需增加节点通过的数量,具体原因我们在详细分析限流部分时会重点讨论,也会再次阐述 PriorityWaitException 的含义。
代码@7:如果捕获到 BlockException,则主要增加阻塞的数量。
代码@8:如果是系统异常,则增加异常数量。
我想上面的代码应该不难理解,但涉及到统计指标数据的变化,都是调用 DefaultNode node 相关的方法,从这里也可以看出,Node 将是实时统计数据的直接持有者,那毋容置疑接下来将重点来学习 Node,为了知识体系的完备性,我们先来看一下 StatisticSlot 的 exit 方法。
StatisticSlot#exit
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) { // @1
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) { // @2
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count); // @3
}
代码@1:成功执行,则重点关注响应时间,其实现亮点如下: 计算本次响应时间,将本次响应时间收集到 Node 中。 将当前活跃线程数减一。
代码@2:执行退出时的 callback。可以通过 StatisticSlotCallbackRegistry 的 addExitCallback 方法添加退出回调函数。
代码@3:传播 exit 事件。
接下来我们将重点介绍 DefaultNode,即 Sentinel 的 Node 体系,持有资源的实时调用信息。
2.1 Node 类体系图
我们先简单介绍一下上述核心类的作用与核心接口或核心属性的含义。
本文将详细介绍 DefaultNode 与 StatisticNode,重点阐述调用树与实时统计信息。DefaultNode 是 StatisticNode 的子类,我们先从 StatisticNode 开始 Node 体系的探究。
我们对其核心属性进行一一解读:
关于 ArrayMetric 滑动窗口设计与实现原理,请参考笔者的另一篇博文:Alibaba Seninel 滑动窗口实现原理(文末附原理图)
接下来我们挑选几个具有代表性的方法进行探究。
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
增加通过请求数量。即将实时调用信息向滑动窗口中进行统计。addPassRequest 即报告成功的通过数量。就是分别调用 秒级、分钟即对应的滑动窗口中添加数量,然后限流规则、熔断规则将基于滑动窗口中的值进行计算。
public long totalRequest() {
return rollingCounterInMinute.pass() + rollingCounterInMinute.block();
}
获取当前时间戳的总请求数,获取分钟级时间窗口中的统计信息。
public double successQps() {
return rollingCounterInSecond.success() / rollingCounterInSecond.getWindowIntervalInSec();
}
成功TPS,用秒级统计滑动窗口中统计的个数 除以 窗口的间隔得出其 tps,即抽样个数越大,其统计越精确。
温馨提示:上面的方法在学习了上文的滑动窗口设计原理后将显得非常简单,大家在学习的过程中,可以总结出一个规律,什么时候时候使用秒级滑动窗口,什么时候使用分钟级滑动窗口。
由于 Sentienl 基于滑动窗口来实时收集统计信息,并存储在内存中,并随着时间的推移,旧的滑动窗口将失效,故需要提供一个方法,及时将所有的统计信息进行汇总输出,供监控客户端定时拉取,转储都其他客户端,例如数据库,方便监控数据的可视化,这也通常是中间件用于监控指标的监控与采集的通用设计方法。
public Map<Long, MetricNode> metrics() {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % ; // @1
Map<Long, MetricNode> metrics = new ConcurrentHashMap<>();
List<MetricNode> nodesOfEverySecond = rollingCounterInMinute.details(); // @2
long newLastFetchTime = lastFetchTime;
// Iterate metrics of all resources, filter valid metrics (not-empty and up-to-date).
for (MetricNode node : nodesOfEverySecond) {
if (isNodeInTime(node, currentTime) && isValidMetricNode(node)) { // @3
metrics.put(node.getTimestamp(), node);
newLastFetchTime = Math.max(newLastFetchTime, node.getTimestamp());
}
}
lastFetchTime = newLastFetchTime;
return metrics;
}
代码@1:获取当前时间对应的滑动窗口的开始时间,可以对比上文计算滑动窗口的算法。
代码@2:获取一分钟内的所有滑动窗口中的统计数据,使用 MetricNode 表示。
代码@3:遍历所有节点,刷选出不是当前滑动窗口外的所有数据。这里的重点是方法:isNodeInTime。
private boolean isNodeInTime(MetricNode node, long currentTime) {
return node.getTimestamp() > lastFetchTime && node.getTimestamp() < currentTime;
}
这里只刷选出不是当前窗口的数据,即 metrics 方法返回的是“过去”的统计数据。
接下来我们再来看看 DefaultNode 相关的几个特性方法。
DefaultNode 是 StatisticNode 的子类,其额外增加的属性如下:
接下来我们将来看一下 DefaultNode 的核心方法。
public void increaseBlockQps(int count) {
super.increaseBlockQps(count);
this.clusterNode.increaseBlockQps(count);
}
DefaultNode 的此类方法,通常是先调用 StatisticNode 的方法,然后再调用 clusterNode 的相关方法,最终就是使用在对应的滑动窗口中增加或减少计量值。
其他方法也比较简单,就不再细看了,我们可以通过 DefaultNode 的 printDefaultNode 方法来打印该节点的调用链。
本文就介绍到这里了,本文详细介绍了 Sentinel 实时数据收集的统一入口 StatisticSlot,并且介绍了 Seninel Node 体系,即调用链中的每一个节点,每一个节点对一个资源的实时统计信息。
如果您喜欢这篇文章,点【在看】与转发是一种美德,期待您的认可与鼓励,越努力越幸运。
欢迎加入我的知识星球,一起交流源码,探讨架构,打造高质量的技术交流圈,长按如下二维码中间件兴趣圈 知识星球 正在对如下话题展开如火如荼的讨论:
1、【让天下没有难学的Netty-网络通道篇】
1、Netty4 Channel概述(已发表)
2、Netty4 ChannelHandler概述(已发表)
3、Netty4事件处理传播机制(已发表)
4、Netty4服务端启动流程(已发表)
5、Netty4 NIO 客户端启动流程
6、Netty4 NIO线程模型分析
7、Netty4编码器、解码器实现原理
8、Netty4 读事件处理流程
9、Netty4 写事件处理流程
10、Netty4 NIO Channel其他方法详解
2、Java 并发框架(JUC) 探讨【面试神器】 3、源码分析Alibaba Sentienl 专栏背后的写作与学习技巧。