Rule

概述

参考:

接口

代码:./rules/manager.go —— Rule{}

Rule 接口封装了一个向量表达式,在指定的时间间隔评估规则。Prometheus 将规则分为两类:Recording Rule(记录规则)Alerting Rule(告警规则),所以将处理这两种规则的方法统一成一个接口,如下两个结构体实现了该接口:

  • ./rules/alerting.go —— AlertingRule{}
  • ./rules/recording.go —— RecordingRule{}
type Rule interface { // 直接返回规则的名称 Name() string // Labels of the rule. Labels() labels.Labels // 评估规则(规则处理逻辑中最重要的部分) Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) // String returns a human-readable string representation of the rule. String() string // Query returns the rule query expression. Query() parser.Expr // SetLastErr sets the current error experienced by the rule. SetLastError(error) // LastErr returns the last error experienced by the rule. LastError() error // SetHealth sets the current health of the rule. SetHealth(RuleHealth) // Health returns the current health of the rule. Health() RuleHealth SetEvaluationDuration(time.Duration) // GetEvaluationDuration returns last evaluation duration. // NOTE: Used dynamically by rules.html template. GetEvaluationDuration() time.Duration SetEvaluationTimestamp(time.Time) // GetEvaluationTimestamp returns last evaluation timestamp. // NOTE: Used dynamically by rules.html template. GetEvaluationTimestamp() time.Time // HTMLSnippet returns a human-readable string representation of the rule, // decorated with HTML elements for use the web frontend. HTMLSnippet(pathPrefix string) html_template.HTML }

这些方法中,最重要的就是 Eval(),用来评估规则。

结构体

规则

代码:./rules/manager.goGroup{}

这是一组具有逻辑关系的规则。Manager.LoadGroups() 方法将会读取配置文件中的每组规则,解析后,返回一个 map[string]*Group(也就是说),以供后续代码使用。

type Group struct { name string file string interval time.Duration limit int rules []Rule seriesInPreviousEval []map[string]labels.Labels // One per Rule. staleSeries []labels.Labels opts *ManagerOptions mtx sync.Mutex evaluationTime time.Duration lastEvaluation time.Time shouldRestore bool markStale bool done chan struct{} terminated chan struct{} managerDone chan struct{} logger log.Logger metrics *Metrics }

规则管理器

代码:./rules/manager.goManager{}

Manager 结构体是规则管理器。负责管理 记录规则 与 告警规则。

type Manager struct { // 管理器的额外选项, opts *ManagerOptions // 通过配置文件传递进来的规则 groups map[string]*Group // 该结构体的读写锁,通常在处理 groups 时会上锁 mtx sync.RWMutex block chan struct{} done chan struct{} restored bool logger log.Logger }

在 Prometheus Server 的 main() 中,通过 NewManager() 函数实例化一个管理器,并在实例化时传递已实例化的 ManagerOptions{}

管理器选项

./rules/manager.go —— ManagerOptions{}

main() 中实例化 Manager{} 时,会将该结构体作为实参传递到 NewManager() 函数中。

type ManagerOptions struct { ExternalURL *url.URL // QueryFunc QueryFunc NotifyFunc NotifyFunc Context context.Context Appendable storage.Appendable Queryable storage.Queryable Logger log.Logger Registerer prometheus.Registerer OutageTolerance time.Duration ForGracePeriod time.Duration ResendDelay time.Duration GroupLoader GroupLoader Metrics *Metrics }

加载并运行规则

./rules/manager.goManager.Updata()

在 Prometheus Server 启动时会调用 Manager.Updata() 方法加载规则配置文件并解析。在配置热更新时。如果加载新规则失败,则将恢复旧规则。

这个更新规则是这么个逻辑:比较每一个规则组,若一样,则将老的复制成新的,然后再加上原来没有的,若不一样,则规则组处理完成后,删除这些剩余的。

这里面最重要部分有两个

  • Manager.LoadGroups() 方法。从配置文件中读取内容,并解析规则,实例化一个规则组(即 Group{} 结构体)
  • 在并发中执行的 Group.run() 方法。该方法中包含里定期评估规则的逻辑
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error { // 下面将会处理 Manager.groups,所以上个锁~ m.mtx.Lock() defer m.mtx.Unlock() // 从配置文件中加载并解析规则,以实例化一个规则组变量供后续处理 groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...) // 准备开始并发喽~ var wg sync.WaitGroup for _, newg := range groups { // 检查新组与旧组标识符是否相等,若相等则跳过,若不等则停止它并等待它完成当前迭代,然后将其复制到新组中 ...... // 并发添加计数 wg.Add(1) // 开始真正的加载规则组,其中主要执行逻辑在 newg.run(m.opts.Context) go func(newg *Group) // 接着上面检查新旧组标识符,如果相等,这里就会将老规则组中的状态信息复制到新规则组中。 if ok { oldg.stop() newg.CopyState(oldg) } wg.Done() // Wait with starting evaluation until the rule manager // is told to run. This is necessary to avoid running // queries against a bootstrapping storage. <-m.block // 加载新组,周期性得执行 PromQL 语句 newg.run(m.opts.Context) }(newg) } // 删除余下的旧组 wg.Add(len(m.groups)) for n, oldg := range m.groups { go func(n string, g *Group) { ...... }(n, oldg) } wg.Wait() m.groups = groups return nil }

加载规则

./rules/manager.go —— Manager.LoadGroups()

在这里,会使用 GroupOptions{} 实例化 Group{}

func (m *Manager) LoadGroups(interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string,) (map[string]*Group, []error) { // 循环配置文件 for _, fn := range filenames { // 通过 ./rulefmt/rulefmt.go —— ParseFile() 解析配置文件,返回 rulefmt 包下的 RuleGroups{},这就是规则组实例 rgs, errs := m.opts.GroupLoader.Load(fn) // 循环规则组实例 for _, rg := range rgs.Groups { // 循环规则组中的每个规则 for _, r := range rg.Rules { } groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ Name: rg.Name, File: fn, Interval: itv, Limit: rg.Limit, Rules: rules, ShouldRestore: shouldRestore, Opts: m.opts, done: m.done, }) } } // 循环完成后,返回实例化的 Group{} return groups, nil }

运行规则

./rules/manager.goGroup.run()

上面的 Manager.LoadGroups() 中获得了实例化的 Group{},通过规则组,定期执行评估行为。这里面最重要的是定期执行的 Group.Eval() 方法。

func (g *Group) run(ctx context.Context) { // 等待一个当前的时间戳,以开始计时,以便根据固定的间隔,持续评估 evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval) // !!!!用闭包声明一个函数变量,这里面是执行评估的最主要代码逻辑!!!!! iter := func() { g.Eval(ctx, evalTimestamp) } // 在一个循环中,每隔一段时间就执行一次 Group.Eval() for { select { case <-g.done: return default: select { case <-g.done: return case <-tick.C: iter() } } } }

评估规则

./rules/manager.goGroup.Eval()

规则评估,在 Group.run() 方法中定期调用本方法来评估所有规则组。

func (g *Group) Eval(ctx context.Context, ts time.Time) { for i, rule := range g.rules { func(i int, rule Rule) { // 记录规则 与 报警规则 具有各自的实现。但是都会返回一个向量。 vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) // 如果当前评估的是报警规则,则进行断言, if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) } }(i, rule) } }

这里面最重要的是运行了 Rule 接口下的 rule.Eval() 方法,由于 Prometheus 将规则分为了两类(记录规则 与 告警规则),所以这里用了一个结构,不同类型的规则,其中处理细节不太一样,但是都会在这里进行评估。

在调用 Rule 接口下的 Eval() 方法时,传递了 QueryFunc,在评估规则时,将会执行 PromQL

评估告警规则

./rules/alerting.go —— AlertingRule.Eval()

[!Warning] 这里面有一个常量 const resolvedRetention = 15 * time.Minute,这个时间是已触发的警报保存在内存中的时间,在这个保存时间内,Prometheus Server 将会持续发送该警报(即使该警报已解决,此时发送的警报状态为 Resolved)。发送间隔取决于 Prometheus 的 –rules.alert.resend-delay 命令行标志配置的值以及配置文件中配置的评估周期 global.evaluation_interval。

评估报警规则表达式,然后创建 pending 状态的报警,若满足条件,则转变为 alerting 状态,或者删除过期的 pending 状态的报警。最后,返回一个向量

func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) { // 执行 PromQL 语句,若没结果则直接返回,继续等待 Group.run() 循环中的下一个元素。 res, err := query(ctx, r.vector.String(), ts) // 若有结果,则给该告警规则上锁,并开始处理 r.mtx.Lock() defer r.mtx.Unlock() // Create pending alerts for any new vector elements in the alert expression // or update the expression value for existing elements. resultFPs := map[uint64]struct{}{} var vec promql.Vector var alerts = make(map[uint64]*Alert, len(res)) // 循环每一个 PromQL 获取到的样本,也就是循环每一个查询语句获得的结果 for _, smpl := range res { // 提取样本中的信息,保存到 Alert{} 中。相当于实例化了 Alert{} alerts[h] = &Alert{ Labels: lbs, Annotations: annotations, ActiveAt: ts, State: StatePending, Value: smpl.V, } } for h, a := range alerts { // 检查标签集是否以具有 alerting 状态 if alert, ok := r.active[h]; ok && alert.State != StateInactive { alert.Value = a.Value alert.Annotations = a.Annotations continue } r.active[h] = a } // Check if any pending alerts should be removed or fire now. Write out alert timeseries. for fp, a := range r.active { if _, ok := resultFPs[fp]; !ok { // If the alert was previously firing, keep it around for a given // retention time so it is reported as resolved to the AlertManager. if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { delete(r.active, fp) } if a.State != StateInactive { a.State = StateInactive a.ResolvedAt = ts } continue } if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { a.State = StateFiring a.FiredAt = ts } if r.restored { vec = append(vec, r.sample(a, ts)) vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) } } numActive := len(r.active) if limit != 0 && numActive > limit { r.active = map[uint64]*Alert{} return nil, errors.Errorf("exceeded limit of %d with %d alerts", limit, numActive) } return vec, nil }

评估记录规则

./rules/recording.go —— RecordingRule.Eval()

评估记录规则,然后相应地覆盖指标名称和标签。

func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) { vector, err := query(ctx, rule.vector.String(), ts) if err != nil { return nil, err } // Override the metric name and labels. for i := range vector { sample := &vector[i] lb := labels.NewBuilder(sample.Metric) lb.Set(labels.MetricName, rule.name) for _, l := range rule.labels { lb.Set(l.Name, l.Value) } sample.Metric = lb.Labels() } // Check that the rule does not produce identical metrics after applying // labels. if vector.ContainsSameLabelset() { return nil, fmt.Errorf("vector contains metrics with the same labelset after applying rule labels") } numSamples := len(vector) if limit != 0 && numSamples > limit { return nil, fmt.Errorf("exceeded limit %d with %d samples", limit, numSamples) } rule.SetHealth(HealthGood) rule.SetLastError(err) return vector, nil }

最后修改 March 25, 2025: clearup (feb59d93)