Prometheus支持用户自定义Rule规则。Rule分为两类,一类是Recording Rule,另一类是Alerting Rule。Recording Rule的主要目的是通过PromQL可以实时对Prometheus中采集到的样本数据进行查询,聚合以及其它各种运算操作。而在某些PromQL较为复杂且计算量较大时,直接使用PromQL可能会导致Prometheus响应超时的情况。这时需要一种能够类似于后台批处理的机制能够在后台完成这些复杂运算的计算,对于使用者而言只需要查询这些运算结果即可。Prometheus通过Recoding Rule规则支持这种后台计算的方式,可以实现对复杂查询的性能优化,提高查询效率。
今天主要带来告警规则的分析。Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件,Prometheus后端对这些触发规则进行周期性计算,当满足触发条件后则会触发告警通知。
告警是prometheus的一个重要功能,接下来从源码的角度来分析下告警的执行流程。
一条典型的告警规则如下所示:
groups:
- name: example
rules:
- alert: HighErrorRate
#指标需要在触发告警之前的10分钟内大于0.5。
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
description: description info
在告警规则文件中,我们可以将一组相关的规则设置定义在一个group下。在每一个group中我们可以定义多个告警规则(rule)。一条告警规则主要由以下几部分组成:
规则管理器会根据配置的规则,基于规则PromQL表达式告警的触发条件,用于计算是否有时间序列满足该条件。在满足该条件时,将告警信息发送给告警服务。
type Manager struct {
opts *ManagerOptions //外部的依赖
groups map[string]*Group //当前的规则组
mtx sync.RWMutex //规则管理器读写锁
block chan struct{}
done chan struct{}
restored bool
logger log.Logger
}
在Prometheus Server启动的过程中,首先会调用Manager.Update()方法加载Rule配置文件并进行解析,其大致流程如下。
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
// 从当前文件中加载规则
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
var wg sync.WaitGroup
//循环遍历规则组
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
//根据新的rules.Group的信息获取规则组名
gn := GroupKey(newg.file, newg.name)
//根据规则组名获取到老的规则组并删除原有的rules.Group实例
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
//为每一个rules.Group实例启动一个goroutine
go func(newg *Group) {
if ok {
oldg.stop()
//将老的规则组中的状态信息复制到新的规则组
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
//调用rules.Group.run()方法,开始周期性的执行PromQl语句
newg.run(m.opts.Context)
}(newg)
}
// Stop remaining old groups.
//停止所有老规则组的服务
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}
wg.Wait()
//更新规则管理器中的规则组
m.groups = groups
return nil
}
规则组启动流程(Group.run):进入Group.run方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为g.interval;然后定义规则运算调度方法:iter,调度周期为g.interval;在iter方法中调用g.Eval方法执行下一层次的规则运算调度。
规则运算的调度周期g.interval,由prometheus.yml配置文件中global中的 [ evaluation_interval:| default = 1m ]指定。实现如下:
func (g *Group) run(ctx context.Context) {
defer close(g.terminated)
// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp))://初始化等待
case <-g.done:
return
}
ctx = promql.NewOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})
//定义规则组规则运算调度算法
iter := func() {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
start := time.Now()
//规则运算的入口
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)
g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}
// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval) //设置规则运算定时器
defer tick.Stop()
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()
//调用规则组规则运算的调度方法
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
//调用规则组规则运算的调度方法
iter()
}
}
}
}
规则组对具体规则的调度在Group.Eval中实现,在Group.Eval方法中会将规则组下的每条规则通过QueryFunc将(promQL)放到查询引擎(queryEngine)中执行,如果被执行的是AlertingRule类型,那么执行结果指标会被NotifyFunc组件发送给告警服务;如果是RecordingRule类型,最后将改结果指标存储到Prometheus的储存管理器中,并对过期指标进行存储标记处理。
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64
遍历当前规则组下的所有规则
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}
func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
//更新服务指标-规则的执行时间
since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
//记录本次规则执行的耗时
rule.SetEvaluationTimestamp(t)
}(time.Now())
//记录规则运算的次数
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
//运算规则
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
//规则出现错误后,终止查询
rule.SetHealth(HealthBad)
rule.SetLastError(err)
//记录查询失败的次数
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
return
}
samplesTotal += float64(len(vector))
//判断是否是告警类型规则
if ar, ok := rule.(*AlertingRule); ok {
发送告警
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numDuplicates = 0
)
//此处为Recording获取存储器指标
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
switch errors.Cause(err) {
储存指标返回的各种错误码处理
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
//缓存规则运算后的结果指标
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
//设置过期指标的指标值
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
}
}
}
}(i, rule)
}
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}
然后就是规则的具体执行了,我们这里先只看AlertingRule的流程。首先看下AlertingRule的结构:
// An AlertingRule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector parser.Expr
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from Pending to Firing state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels labels.Labels
// Non-identifying key/value pairs.
annotations labels.Labels
// External labels from the global config.
externalLabels map[string]string
// true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
// only after the restoration.
restored bool
// Protects the below.
mtx sync.Mutex
// Time in seconds taken to evaluate rule.
evaluationDuration time.Duration
// Timestamp of last evaluation of rule.
evaluationTimestamp time.Time
// The health of the alerting rule.
health RuleHealth
// The last error seen by the alerting rule.
lastError error
// A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to.
active map[uint64]*Alert
logger log.Logger
}
这里比较重要的就是active字段了,它保存了执行规则后需要进行告警的资源,具体是否告警还要执行一系列的逻辑来判断是否满足告警条件。具体执行的逻辑如下:
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
// ......
}
这一步通过创建Manager时传入的QueryFunc函数执行规则配置中的expr表达式,然后得到返回的结果,这里的结果是满足表达式的指标的集合。比如配置的规则为:
cpu_usage > 90
那么查出来的结果可能是
cpu_usage{instance="192.168.0.11"} 91
cpu_usage{instance="192.168.0.12"} 92
然后遍历查询到的结果,根据指标的标签生成一个hash值,然后判断这个hash值是否之前已经存在(即之前是否已经有相同的指标数据返回),如果是,则更新上次的value及annotations,如果不是,则创建一个新的alert并保存至该规则下的active alert列表中。然后遍历规则的active alert列表,根据规则的持续时长配置、alert的上次触发时间、alert的当前状态、本次查询alert是否依然存在等信息来修改alert的状态。具体规则如下:
上面那一步只是修改了alert的状态,但是并没有真正执行发送告警操作。下面才是真正要执行告警操作:
// 判断规则是否是alert规则,如果是则发送告警信息(具体是否真正发送由ar.sendAlerts中的逻辑判断)
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
// .......
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}
概括一下以上逻辑就是:
以上就是prometheus的告警流程。学习这个流程主要是问了能够对prometheus的rules相关的做二次开发。我们可以修改LoadGroups()方法,让其可以动态侧加载定义在mysql中定义的规则,动态实现告警规则更新。
参考:
《深入浅出prometheus原理、应用、源码与拓展详解》