VictoriaMetrics/lib/streamaggr/streamaggr.go

666 lines
19 KiB
Go

package streamaggr
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
"total",
"increase",
"count_series",
"count_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(phi1, ..., phiN)",
}
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregatorsFromData(data, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
return as, nil
}
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err
}
return NewAggregators(cfgs, pushFunc)
}
// Config is a configuration for a single stream aggregation.
type Config struct {
// Match is a label selector for filtering time series for the given selector.
//
// If the match isn't set, then all the input time series are processed.
Match *promrelabel.IfExpression `yaml:"match,omitempty"`
// Interval is the interval between aggregations.
Interval string `yaml:"interval"`
// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
//
// - total - aggregates input counters
// - increase - counts the increase over input counters
// - count_series - counts the input series
// - count_samples - counts the input samples
// - sum_samples - sums the input samples
// - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value
// - avg - the average value across all the samples
// - stddev - standard deviation across all the samples
// - stdvar - standard variance across all the samples
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
//
// The output time series will have the following names:
//
// input_name:aggr_<interval>_<output>
//
Outputs []string `yaml:"outputs"`
// By is an optional list of labels for grouping input series.
//
// See also Without.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
By []string `yaml:"by,omitempty"`
// Without is an optional list of labels, which must be excluded when grouping input series.
//
// See also By.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
Without []string `yaml:"without,omitempty"`
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
// before aggregation.
InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as []*aggregator
}
// NewAggregators creates Aggregators from the given cfgs.
//
// pushFunc is called when the aggregated data must be flushed.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) {
if len(cfgs) == 0 {
return nil, nil
}
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
return &Aggregators{
as: as,
}, nil
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.MustStop()
}
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.Push(tss)
}
}
// aggregator aggregates input series according to the config passed to NewAggregator
type aggregator struct {
match *promrelabel.IfExpression
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
by []string
without []string
aggregateOnlyByTime bool
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, lables in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
}
if interval <= time.Second {
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
}
// initialize input_relabel_configs and output_relabel_configs
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse input_relabel_configs: %w", err)
}
outputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.OutputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse output_relabel_configs: %w", err)
}
// check by and without lists
by := sortAndRemoveDuplicates(cfg.By)
without := sortAndRemoveDuplicates(cfg.Without)
if len(by) > 0 && len(without) > 0 {
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without)
}
aggregateOnlyByTime := (len(by) == 0 && len(without) == 0)
if !aggregateOnlyByTime && len(without) == 0 {
by = addMissingUnderscoreName(by)
}
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", supportedOutputs)
}
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for j, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
case "last":
aggrStates[i] = newLastAggrState()
case "min":
aggrStates[i] = newMinAggrState()
case "max":
aggrStates[i] = newMaxAggrState()
case "avg":
aggrStates[i] = newAvgAggrState()
case "stddev":
aggrStates[i] = newStddevAggrState()
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(interval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
}
}
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
if labels := removeUnderscoreName(by); len(labels) > 0 {
suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_"))
}
if labels := removeUnderscoreName(without); len(labels) > 0 {
suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_"))
}
suffix += "_"
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
stopCh: make(chan struct{}),
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
defer a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
}
}
var flushConcurrencyCh = make(chan struct{}, 2*cgroup.AvailableCPUs())
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
}
for _, as := range a.aggrStates {
ctx.reset()
as.appendSeriesForFlush(ctx)
tss := ctx.tss
// Apply output relabeling
if a.outputRelabeling != nil {
dst := tss[:0]
for _, ts := range tss {
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
if len(ts.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
dst = append(dst, ts)
}
tss = dst
}
// Push the output metrics.
a.pushFunc(tss)
}
}
// MustStop stops the aggregator.
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
close(a.stopCh)
a.wg.Wait()
}
// Push pushes series to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
for _, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
labels.Sort()
if a.aggregateOnlyByTime {
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
} else {
tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
}
outputKey := bytesutil.InternBytes(bb.B)
inputKey := ""
if !a.aggregateOnlyByTime {
tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
inputKey = bytesutil.InternBytes(bb.B)
}
for _, sample := range ts.Samples {
a.pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
promutils.PutLabels(tmpLabels)
promutils.PutLabels(labels)
}
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
if math.IsNaN(value) {
// Skip nan samples
return
}
for _, as := range a.aggrStates {
as.pushSample(inputKey, outputKey, value)
}
}
func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if !hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if !hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func hasInArray(name string, a []string) bool {
for _, s := range a {
if name == s {
return true
}
}
return false
}
func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(labels)))
for _, label := range labels {
dst = encoding.MarshalUint32(dst, uint32(len(label.Name)))
dst = append(dst, label.Name...)
dst = encoding.MarshalUint32(dst, uint32(len(label.Value)))
dst = append(dst, label.Value...)
}
return dst
}
func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) {
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src))
}
n := encoding.UnmarshalUint32(src)
src = src[4:]
for i := uint32(0); i < n; i++ {
// Unmarshal label name
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src))
}
labelNameLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelNameLen {
return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen)
}
labelName := bytesutil.InternBytes(src[:labelNameLen])
src = src[labelNameLen:]
// Unmarshal label value
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src))
}
labelValueLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelValueLen {
return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen)
}
labelValue := bytesutil.InternBytes(src[:labelValueLen])
src = src[labelValueLen:]
dst = append(dst, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
if len(src) > 0 {
return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src))
}
return dst, nil
}
type flushCtx struct {
suffix string
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *flushCtx) reset() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
promrelabel.CleanLabels(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = append(ctx.labels, prompbmarshal.Label{
Name: extraName,
Value: extraValue,
})
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label {
src := labels[offset:]
for i := range src {
label := &src[i]
if label.Name != "__name__" {
continue
}
bb := bbPool.Get()
bb.B = append(bb.B, label.Value...)
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
label.Value = bytesutil.InternBytes(bb.B)
bbPool.Put(bb)
return labels
}
// The __name__ isn't found. Add it
bb := bbPool.Get()
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
labelValue := bytesutil.InternBytes(bb.B)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: labelValue,
})
return labels
}
func addMissingUnderscoreName(labels []string) []string {
result := []string{"__name__"}
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func removeUnderscoreName(labels []string) []string {
var result []string
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func sortAndRemoveDuplicates(a []string) []string {
if len(a) == 0 {
return nil
}
a = append([]string{}, a...)
sort.Strings(a)
dst := a[:1]
for _, v := range a[1:] {
if v != dst[len(dst)-1] {
dst = append(dst, v)
}
}
return dst
}