VictoriaMetrics/lib/streamaggr/streamaggr.go
Aliaksandr Valialkin 5fe7ff24c2
lib/streamaggr: limit the the number of concurrent flushes of the aggregate data to the exact number of available CPUs
This should reduce the maximum memory usage during concurrent flushes of the aggregate data
2023-01-07 00:18:51 -08:00

666 lines
19 KiB
Go

package streamaggr
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"gopkg.in/yaml.v2"
)
var supportedOutputs = []string{
"total",
"increase",
"count_series",
"count_samples",
"sum_samples",
"last",
"min",
"max",
"avg",
"stddev",
"stdvar",
"histogram_bucket",
"quantiles(phi1, ..., phiN)",
}
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
data, err := fs.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
}
as, err := NewAggregatorsFromData(data, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
}
return as, nil
}
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err
}
return NewAggregators(cfgs, pushFunc)
}
// Config is a configuration for a single stream aggregation.
type Config struct {
// Match is a label selector for filtering time series for the given selector.
//
// If the match isn't set, then all the input time series are processed.
Match *promrelabel.IfExpression `yaml:"match,omitempty"`
// Interval is the interval between aggregations.
Interval string `yaml:"interval"`
// Outputs is a list of output aggregate functions to produce.
//
// The following names are allowed:
//
// - total - aggregates input counters
// - increase - counts the increase over input counters
// - count_series - counts the input series
// - count_samples - counts the input samples
// - sum_samples - sums the input samples
// - last - the last biggest sample value
// - min - the minimum sample value
// - max - the maximum sample value
// - avg - the average value across all the samples
// - stddev - standard deviation across all the samples
// - stdvar - standard variance across all the samples
// - histogram_bucket - creates VictoriaMetrics histogram for input samples
// - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1]
//
// The output time series will have the following names:
//
// input_name:aggr_<interval>_<output>
//
Outputs []string `yaml:"outputs"`
// By is an optional list of labels for grouping input series.
//
// See also Without.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
By []string `yaml:"by,omitempty"`
// Without is an optional list of labels, which must be excluded when grouping input series.
//
// See also By.
//
// If neither By nor Without are set, then the Outputs are calculated
// individually per each input time series.
Without []string `yaml:"without,omitempty"`
// InputRelabelConfigs is an optional relabeling rules, which are applied on the input
// before aggregation.
InputRelabelConfigs []promrelabel.RelabelConfig `yaml:"input_relabel_configs,omitempty"`
// OutputRelabelConfigs is an optional relabeling rules, which are applied
// on the aggregated output before being sent to remote storage.
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
}
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct {
as []*aggregator
}
// NewAggregators creates Aggregators from the given cfgs.
//
// pushFunc is called when the aggregated data must be flushed.
//
// MustStop must be called on the returned Aggregators when they are no longer needed.
func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) {
if len(cfgs) == 0 {
return nil, nil
}
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
}
return &Aggregators{
as: as,
}, nil
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.MustStop()
}
}
// Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil {
return
}
for _, aggr := range a.as {
aggr.Push(tss)
}
}
// aggregator aggregates input series according to the config passed to NewAggregator
type aggregator struct {
match *promrelabel.IfExpression
inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs
by []string
without []string
aggregateOnlyByTime bool
// aggrStates contains aggregate states for the given outputs
aggrStates []aggrState
pushFunc PushFunc
// suffix contains a suffix, which should be added to aggregate metric names
//
// It contains the interval, lables in (by, without), plus output name.
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
type PushFunc func(tss []prompbmarshal.TimeSeries)
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
// check cfg.Interval
interval, err := time.ParseDuration(cfg.Interval)
if err != nil {
return nil, fmt.Errorf("cannot parse `interval: %q`: %w", cfg.Interval, err)
}
if interval <= time.Second {
return nil, fmt.Errorf("the minimum supported aggregation interval is 1s; got %s", interval)
}
// initialize input_relabel_configs and output_relabel_configs
inputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.InputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse input_relabel_configs: %w", err)
}
outputRelabeling, err := promrelabel.ParseRelabelConfigs(cfg.OutputRelabelConfigs)
if err != nil {
return nil, fmt.Errorf("cannot parse output_relabel_configs: %w", err)
}
// check by and without lists
by := sortAndRemoveDuplicates(cfg.By)
without := sortAndRemoveDuplicates(cfg.Without)
if len(by) > 0 && len(without) > 0 {
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without)
}
aggregateOnlyByTime := (len(by) == 0 && len(without) == 0)
if !aggregateOnlyByTime && len(without) == 0 {
by = addMissingUnderscoreName(by)
}
// initialize outputs list
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", supportedOutputs)
}
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for j, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
case "last":
aggrStates[i] = newLastAggrState()
case "min":
aggrStates[i] = newMinAggrState()
case "max":
aggrStates[i] = newMaxAggrState()
case "avg":
aggrStates[i] = newAvgAggrState()
case "stddev":
aggrStates[i] = newStddevAggrState()
case "stdvar":
aggrStates[i] = newStdvarAggrState()
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(interval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
}
}
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
if labels := removeUnderscoreName(by); len(labels) > 0 {
suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_"))
}
if labels := removeUnderscoreName(without); len(labels) > 0 {
suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_"))
}
suffix += "_"
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling,
by: by,
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
aggrStates: aggrStates,
pushFunc: pushFunc,
suffix: suffix,
stopCh: make(chan struct{}),
}
a.wg.Add(1)
go func() {
a.runFlusher(interval)
defer a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runFlusher(interval time.Duration) {
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-a.stopCh:
return
case <-t.C:
}
// Globally limit the concurrency for metrics' flush
// in order to limit memory usage when big number of aggregators
// are flushed at the same time.
flushConcurrencyCh <- struct{}{}
a.flush()
<-flushConcurrencyCh
}
}
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
func (a *aggregator) flush() {
ctx := &flushCtx{
suffix: a.suffix,
}
for _, as := range a.aggrStates {
ctx.reset()
as.appendSeriesForFlush(ctx)
tss := ctx.tss
// Apply output relabeling
if a.outputRelabeling != nil {
dst := tss[:0]
for _, ts := range tss {
ts.Labels = a.outputRelabeling.Apply(ts.Labels, 0)
if len(ts.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
dst = append(dst, ts)
}
tss = dst
}
// Push the output metrics.
a.pushFunc(tss)
}
}
// MustStop stops the aggregator.
//
// The aggregator stops pushing the aggregated metrics after this call.
func (a *aggregator) MustStop() {
close(a.stopCh)
a.wg.Wait()
}
// Push pushes series to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
labels := promutils.GetLabels()
tmpLabels := promutils.GetLabels()
bb := bbPool.Get()
for _, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
}
labels.Labels = append(labels.Labels[:0], ts.Labels...)
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
if len(labels.Labels) == 0 {
// The metric has been deleted by the relabeling
continue
}
labels.Sort()
if a.aggregateOnlyByTime {
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
} else {
tmpLabels.Labels = removeUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
}
outputKey := bytesutil.InternBytes(bb.B)
inputKey := ""
if !a.aggregateOnlyByTime {
tmpLabels.Labels = extractUnneededLabels(tmpLabels.Labels[:0], labels.Labels, a.by, a.without)
bb.B = marshalLabelsFast(bb.B[:0], tmpLabels.Labels)
inputKey = bytesutil.InternBytes(bb.B)
}
for _, sample := range ts.Samples {
a.pushSample(inputKey, outputKey, sample.Value)
}
}
bbPool.Put(bb)
promutils.PutLabels(tmpLabels)
promutils.PutLabels(labels)
}
var bbPool bytesutil.ByteBufferPool
func (a *aggregator) pushSample(inputKey, outputKey string, value float64) {
if math.IsNaN(value) {
// Skip nan samples
return
}
for _, as := range a.aggrStates {
as.pushSample(inputKey, outputKey, value)
}
}
func extractUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if !hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func removeUnneededLabels(dst, labels []prompbmarshal.Label, by, without []string) []prompbmarshal.Label {
if len(without) > 0 {
for _, label := range labels {
if !hasInArray(label.Name, without) {
dst = append(dst, label)
}
}
} else {
for _, label := range labels {
if hasInArray(label.Name, by) {
dst = append(dst, label)
}
}
}
return dst
}
func hasInArray(name string, a []string) bool {
for _, s := range a {
if name == s {
return true
}
}
return false
}
func marshalLabelsFast(dst []byte, labels []prompbmarshal.Label) []byte {
dst = encoding.MarshalUint32(dst, uint32(len(labels)))
for _, label := range labels {
dst = encoding.MarshalUint32(dst, uint32(len(label.Name)))
dst = append(dst, label.Name...)
dst = encoding.MarshalUint32(dst, uint32(len(label.Value)))
dst = append(dst, label.Value...)
}
return dst
}
func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal.Label, error) {
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal labels count from %d bytes; needs at least 4 bytes", len(src))
}
n := encoding.UnmarshalUint32(src)
src = src[4:]
for i := uint32(0); i < n; i++ {
// Unmarshal label name
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label name length from %d bytes; needs at least 4 bytes", len(src))
}
labelNameLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelNameLen {
return dst, fmt.Errorf("cannot unmarshal label name from %d bytes; needs at least %d bytes", len(src), labelNameLen)
}
labelName := bytesutil.InternBytes(src[:labelNameLen])
src = src[labelNameLen:]
// Unmarshal label value
if len(src) < 4 {
return dst, fmt.Errorf("cannot unmarshal label value length from %d bytes; needs at least 4 bytes", len(src))
}
labelValueLen := encoding.UnmarshalUint32(src)
src = src[4:]
if uint32(len(src)) < labelValueLen {
return dst, fmt.Errorf("cannot unmarshal label value from %d bytes; needs at least %d bytes", len(src), labelValueLen)
}
labelValue := bytesutil.InternBytes(src[:labelValueLen])
src = src[labelValueLen:]
dst = append(dst, prompbmarshal.Label{
Name: labelName,
Value: labelValue,
})
}
if len(src) > 0 {
return dst, fmt.Errorf("unexpected non-empty tail after unmarshaling labels; tail length is %d bytes", len(src))
}
return dst, nil
}
type flushCtx struct {
suffix string
tss []prompbmarshal.TimeSeries
labels []prompbmarshal.Label
samples []prompbmarshal.Sample
}
func (ctx *flushCtx) reset() {
ctx.tss = prompbmarshal.ResetTimeSeries(ctx.tss)
promrelabel.CleanLabels(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int64, value float64) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func (ctx *flushCtx) appendSeriesWithExtraLabel(labelsMarshaled, suffix string, timestamp int64, value float64, extraName, extraValue string) {
var err error
labelsLen := len(ctx.labels)
samplesLen := len(ctx.samples)
ctx.labels, err = unmarshalLabelsFast(ctx.labels, bytesutil.ToUnsafeBytes(labelsMarshaled))
if err != nil {
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
}
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
ctx.labels = append(ctx.labels, prompbmarshal.Label{
Name: extraName,
Value: extraValue,
})
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
Timestamp: timestamp,
Value: value,
})
ctx.tss = append(ctx.tss, prompbmarshal.TimeSeries{
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
}
func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label {
src := labels[offset:]
for i := range src {
label := &src[i]
if label.Name != "__name__" {
continue
}
bb := bbPool.Get()
bb.B = append(bb.B, label.Value...)
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
label.Value = bytesutil.InternBytes(bb.B)
bbPool.Put(bb)
return labels
}
// The __name__ isn't found. Add it
bb := bbPool.Get()
bb.B = append(bb.B, firstSuffix...)
bb.B = append(bb.B, lastSuffix...)
labelValue := bytesutil.InternBytes(bb.B)
labels = append(labels, prompbmarshal.Label{
Name: "__name__",
Value: labelValue,
})
return labels
}
func addMissingUnderscoreName(labels []string) []string {
result := []string{"__name__"}
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func removeUnderscoreName(labels []string) []string {
var result []string
for _, s := range labels {
if s == "__name__" {
continue
}
result = append(result, s)
}
return result
}
func sortAndRemoveDuplicates(a []string) []string {
if len(a) == 0 {
return nil
}
a = append([]string{}, a...)
sort.Strings(a)
dst := a[:1]
for _, v := range a[1:] {
if v != dst[len(dst)-1] {
dst = append(dst, v)
}
}
return dst
}