Rule

概述

参考:

接口

代码:./rules/manager.go —— Rule{}

Rule 接口封装了一个向量表达式,在指定的时间间隔评估规则。Prometheus 将规则分为两类:Recording Rule(记录规则)Alerting Rule(告警规则),所以将处理这两种规则的方法统一成一个接口,如下两个结构体实现了该接口:

  • ./rules/alerting.go —— AlertingRule{}
  • ./rules/recording.go —— RecordingRule{}
type Rule interface {
    // 直接返回规则的名称
	Name() string
	// Labels of the rule.
	Labels() labels.Labels
    // 评估规则(规则处理逻辑中最重要的部分)
	Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error)
	// String returns a human-readable string representation of the rule.
	String() string
	// Query returns the rule query expression.
	Query() parser.Expr
	// SetLastErr sets the current error experienced by the rule.
	SetLastError(error)
	// LastErr returns the last error experienced by the rule.
	LastError() error
	// SetHealth sets the current health of the rule.
	SetHealth(RuleHealth)
	// Health returns the current health of the rule.
	Health() RuleHealth
	SetEvaluationDuration(time.Duration)
	// GetEvaluationDuration returns last evaluation duration.
	// NOTE: Used dynamically by rules.html template.
	GetEvaluationDuration() time.Duration
	SetEvaluationTimestamp(time.Time)
	// GetEvaluationTimestamp returns last evaluation timestamp.
	// NOTE: Used dynamically by rules.html template.
	GetEvaluationTimestamp() time.Time
	// HTMLSnippet returns a human-readable string representation of the rule,
	// decorated with HTML elements for use the web frontend.
	HTMLSnippet(pathPrefix string) html_template.HTML
}

这些方法中,最重要的就是 Eval(),用来评估规则。

结构体

规则

代码:./rules/manager.goGroup{}

这是一组具有逻辑关系的规则。Manager.LoadGroups() 方法将会读取配置文件中的每组规则,解析后,返回一个 map[string]*Group(也就是说),以供后续代码使用。

type Group struct {
	name                 string
	file                 string
	interval             time.Duration
	limit                int
	rules                []Rule
	seriesInPreviousEval []map[string]labels.Labels // One per Rule.
	staleSeries          []labels.Labels
	opts                 *ManagerOptions
	mtx                  sync.Mutex
	evaluationTime       time.Duration
	lastEvaluation       time.Time

	shouldRestore bool

	markStale   bool
	done        chan struct{}
	terminated  chan struct{}
	managerDone chan struct{}

	logger log.Logger

	metrics *Metrics
}

规则管理器

代码:./rules/manager.goManager{}

Manager 结构体是规则管理器。负责管理 记录规则 与 告警规则。

type Manager struct {
    // 管理器的额外选项,
	opts     *ManagerOptions
    // 通过配置文件传递进来的规则
	groups   map[string]*Group
    // 该结构体的读写锁,通常在处理 groups 时会上锁
	mtx      sync.RWMutex
	block    chan struct{}
	done     chan struct{}
	restored bool

	logger log.Logger
}

在 Prometheus Server 的 main() 中,通过 NewManager() 函数实例化一个管理器,并在实例化时传递已实例化的 ManagerOptions{}

管理器选项

./rules/manager.go —— ManagerOptions{}

main() 中实例化 Manager{} 时,会将该结构体作为实参传递到 NewManager() 函数中。

type ManagerOptions struct {
	ExternalURL     *url.URL
    //
	QueryFunc       QueryFunc
	NotifyFunc      NotifyFunc
	Context         context.Context
	Appendable      storage.Appendable
	Queryable       storage.Queryable
	Logger          log.Logger
	Registerer      prometheus.Registerer
	OutageTolerance time.Duration
	ForGracePeriod  time.Duration
	ResendDelay     time.Duration
	GroupLoader     GroupLoader

	Metrics *Metrics
}

加载并运行规则

./rules/manager.goManager.Updata()

在 Prometheus Server 启动时会调用 Manager.Updata() 方法加载规则配置文件并解析。在配置热更新时。如果加载新规则失败,则将恢复旧规则。

这个更新规则是这么个逻辑:比较每一个规则组,若一样,则将老的复制成新的,然后再加上原来没有的,若不一样,则规则组处理完成后,删除这些剩余的。

这里面最重要部分有两个

  • Manager.LoadGroups() 方法。从配置文件中读取内容,并解析规则,实例化一个规则组(即 Group{} 结构体)
  • 在并发中执行的 Group.run() 方法。该方法中包含里定期评估规则的逻辑
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
	// 下面将会处理 Manager.groups,所以上个锁~
    m.mtx.Lock()
	defer m.mtx.Unlock()
    // 从配置文件中加载并解析规则,以实例化一个规则组变量供后续处理
	groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
    // 准备开始并发喽~
	var wg sync.WaitGroup
	for _, newg := range groups {
        // 检查新组与旧组标识符是否相等,若相等则跳过,若不等则停止它并等待它完成当前迭代,然后将其复制到新组中
		......
        // 并发添加计数
        wg.Add(1)
        // 开始真正的加载规则组,其中主要执行逻辑在 newg.run(m.opts.Context)
		go func(newg *Group)
        	// 接着上面检查新旧组标识符,如果相等,这里就会将老规则组中的状态信息复制到新规则组中。
			if ok {
				oldg.stop()
				newg.CopyState(oldg)
			}
			wg.Done()
			// Wait with starting evaluation until the rule manager
			// is told to run. This is necessary to avoid running
			// queries against a bootstrapping storage.
			<-m.block
            // 加载新组,周期性得执行 PromQL 语句
			newg.run(m.opts.Context)
		}(newg)
	}

	// 删除余下的旧组
	wg.Add(len(m.groups))
	for n, oldg := range m.groups {
		go func(n string, g *Group) {
            ......
		}(n, oldg)
	}

	wg.Wait()
	m.groups = groups

	return nil
}

加载规则

./rules/manager.go —— Manager.LoadGroups()

在这里,会使用 GroupOptions{} 实例化 Group{}

func (m *Manager) LoadGroups(interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string,) (map[string]*Group, []error) {
    // 循环配置文件
	for _, fn := range filenames {
        // 通过 ./rulefmt/rulefmt.go —— ParseFile() 解析配置文件,返回 rulefmt 包下的 RuleGroups{},这就是规则组实例
		rgs, errs := m.opts.GroupLoader.Load(fn)
        // 循环规则组实例
		for _, rg := range rgs.Groups {
            // 循环规则组中的每个规则
			for _, r := range rg.Rules {
			}

			groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
				Name:          rg.Name,
				File:          fn,
				Interval:      itv,
				Limit:         rg.Limit,
				Rules:         rules,
				ShouldRestore: shouldRestore,
				Opts:          m.opts,
				done:          m.done,
			})
		}
	}
    // 循环完成后,返回实例化的 Group{}
	return groups, nil
}

运行规则

./rules/manager.goGroup.run()

上面的 Manager.LoadGroups() 中获得了实例化的 Group{},通过规则组,定期执行评估行为。这里面最重要的是定期执行的 Group.Eval() 方法。

func (g *Group) run(ctx context.Context) {
	// 等待一个当前的时间戳,以开始计时,以便根据固定的间隔,持续评估
	evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)

    // !!!!用闭包声明一个函数变量,这里面是执行评估的最主要代码逻辑!!!!!
	iter := func() {
		g.Eval(ctx, evalTimestamp)
	}
    // 在一个循环中,每隔一段时间就执行一次 Group.Eval()
	for {
		select {
		case <-g.done:
			return
		default:
			select {
			case <-g.done:
				return
			case <-tick.C:
				iter()
			}
		}
	}
}

评估规则

./rules/manager.goGroup.Eval()

规则评估,在 Group.run() 方法中定期调用本方法来评估所有规则组。

func (g *Group) Eval(ctx context.Context, ts time.Time) {
	for i, rule := range g.rules {
		func(i int, rule Rule) {
            // 记录规则 与 报警规则 具有各自的实现。但是都会返回一个向量。
			vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
            // 如果当前评估的是报警规则,则进行断言,
			if ar, ok := rule.(*AlertingRule); ok {
				ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
			}
		}(i, rule)
	}
}

这里面最重要的是运行了 Rule 接口下的 rule.Eval() 方法,由于 Prometheus 将规则分为了两类(记录规则 与 告警规则),所以这里用了一个结构,不同类型的规则,其中处理细节不太一样,但是都会在这里进行评估。

在调用 Rule 接口下的 Eval() 方法时,传递了 QueryFunc,在评估规则时,将会执行 PromQL

评估告警规则

./rules/alerting.go —— AlertingRule.Eval()

[!Warning] 这里面有一个常量 const resolvedRetention = 15 * time.Minute,这个时间是已触发的警报保存在内存中的时间,在这个保存时间内,Prometheus Server 将会持续发送该警报(即使该警报已解决,此时发送的警报状态为 Resolved)。发送间隔取决于 Prometheus 的 –rules.alert.resend-delay 命令行标志配置的值以及配置文件中配置的评估周期 global.evaluation_interval。

评估报警规则表达式,然后创建 pending 状态的报警,若满足条件,则转变为 alerting 状态,或者删除过期的 pending 状态的报警。最后,返回一个向量

func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
    // 执行 PromQL 语句,若没结果则直接返回,继续等待 Group.run() 循环中的下一个元素。
	res, err := query(ctx, r.vector.String(), ts)
	// 若有结果,则给该告警规则上锁,并开始处理
	r.mtx.Lock()
	defer r.mtx.Unlock()

	// Create pending alerts for any new vector elements in the alert expression
	// or update the expression value for existing elements.
	resultFPs := map[uint64]struct{}{}

	var vec promql.Vector
	var alerts = make(map[uint64]*Alert, len(res))
    // 循环每一个 PromQL 获取到的样本,也就是循环每一个查询语句获得的结果
	for _, smpl := range res {
        // 提取样本中的信息,保存到 Alert{} 中。相当于实例化了 Alert{}
		alerts[h] = &Alert{
			Labels:      lbs,
			Annotations: annotations,
			ActiveAt:    ts,
			State:       StatePending,
			Value:       smpl.V,
		}
	}

	for h, a := range alerts {
		// 检查标签集是否以具有 alerting 状态
		if alert, ok := r.active[h]; ok && alert.State != StateInactive {
			alert.Value = a.Value
			alert.Annotations = a.Annotations
			continue
		}

		r.active[h] = a
	}

	// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
	for fp, a := range r.active {
		if _, ok := resultFPs[fp]; !ok {
			// If the alert was previously firing, keep it around for a given
			// retention time so it is reported as resolved to the AlertManager.
			if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
				delete(r.active, fp)
			}
			if a.State != StateInactive {
				a.State = StateInactive
				a.ResolvedAt = ts
			}
			continue
		}

		if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration {
			a.State = StateFiring
			a.FiredAt = ts
		}

		if r.restored {
			vec = append(vec, r.sample(a, ts))
			vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
		}
	}

	numActive := len(r.active)
	if limit != 0 && numActive > limit {
		r.active = map[uint64]*Alert{}
		return nil, errors.Errorf("exceeded limit of %d with %d alerts", limit, numActive)
	}

	return vec, nil
}

评估记录规则

./rules/recording.go —— RecordingRule.Eval()

评估记录规则,然后相应地覆盖指标名称和标签。

func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
	vector, err := query(ctx, rule.vector.String(), ts)
	if err != nil {
		return nil, err
	}
	// Override the metric name and labels.
	for i := range vector {
		sample := &vector[i]

		lb := labels.NewBuilder(sample.Metric)

		lb.Set(labels.MetricName, rule.name)

		for _, l := range rule.labels {
			lb.Set(l.Name, l.Value)
		}

		sample.Metric = lb.Labels()
	}

	// Check that the rule does not produce identical metrics after applying
	// labels.
	if vector.ContainsSameLabelset() {
		return nil, fmt.Errorf("vector contains metrics with the same labelset after applying rule labels")
	}

	numSamples := len(vector)
	if limit != 0 && numSamples > limit {
		return nil, fmt.Errorf("exceeded limit %d with %d samples", limit, numSamples)
	}

	rule.SetHealth(HealthGood)
	rule.SetLastError(err)
	return vector, nil
}

最后修改 December 18, 2024: promql, prom development (986a0fc6)