From fe9113b5b2cd9d7ed57aa7cec6643fb218d0cfac Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Fri, 25 Oct 2024 07:29:33 +0300 Subject: [PATCH] lib/streamaggr: made input key compression optional --- docs/changelog/CHANGELOG.md | 1 + lib/streamaggr/dedup_timing_test.go | 2 +- lib/streamaggr/streamaggr.go | 85 ++++++++++++++++------------- 3 files changed, 49 insertions(+), 39 deletions(-) diff --git a/docs/changelog/CHANGELOG.md b/docs/changelog/CHANGELOG.md index b5e0fb0c1..b522a693b 100644 --- a/docs/changelog/CHANGELOG.md +++ b/docs/changelog/CHANGELOG.md @@ -18,6 +18,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). ## tip * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995). +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): disable input labels compression for outputs, that do not require it. This should improve memory usage and will help with labels compressor cleanup. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): properly set `group_name` and `file` fields for recording rules in `/api/v1/rules`. diff --git a/lib/streamaggr/dedup_timing_test.go b/lib/streamaggr/dedup_timing_test.go index 2b6bab25c..edf1dac1a 100644 --- a/lib/streamaggr/dedup_timing_test.go +++ b/lib/streamaggr/dedup_timing_test.go @@ -65,7 +65,7 @@ func newBenchSamples(count int) []pushSample { Name: "app", Value: fmt.Sprintf("instance-%d", i), }) - keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) + keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:], true) sample.key = string(keyBuf) sample.value = float64(i) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 6188f8b80..c76eb8970 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -371,6 +371,7 @@ type aggregator struct { inputRelabeling *promrelabel.ParsedConfigs outputRelabeling *promrelabel.ParsedConfigs + needInputKey bool keepMetricNames bool ignoreOldSamples bool @@ -554,13 +555,17 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) } + needInputKey := false aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) for i, output := range cfg.Outputs { - as, err := newAggrState(output, outputsSeen, stalenessInterval) + as, requireInputKey, err := newAggrState(output, outputsSeen, stalenessInterval) if err != nil { return nil, err } + if !needInputKey && requireInputKey { + needInputKey = requireInputKey + } aggrOutputs[i] = aggrOutput{ as: as, @@ -586,6 +591,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, inputRelabeling: inputRelabeling, outputRelabeling: outputRelabeling, + needInputKey: needInputKey, keepMetricNames: keepMetricNames, ignoreOldSamples: ignoreOldSamples, @@ -614,6 +620,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, } if dedupInterval > 0 { + a.needInputKey = true a.da = newDedupAggr() _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { @@ -645,20 +652,20 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, return a, nil } -func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { +func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, bool, error) { // check for duplicated output if _, ok := outputsSeen[output]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) + return nil, false, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) } outputsSeen[output] = struct{}{} if strings.HasPrefix(output, "quantiles(") { if !strings.HasSuffix(output, ")") { - return nil, fmt.Errorf("missing closing brace for `quantiles()` output") + return nil, false, fmt.Errorf("missing closing brace for `quantiles()` output") } argsStr := output[len("quantiles(") : len(output)-1] if len(argsStr) == 0 { - return nil, fmt.Errorf("`quantiles()` must contain at least one phi") + return nil, false, fmt.Errorf("`quantiles()` must contain at least one phi") } args := strings.Split(argsStr, ",") phis := make([]float64, len(args)) @@ -666,57 +673,57 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter arg = strings.TrimSpace(arg) phi, err := strconv.ParseFloat(arg, 64) if err != nil { - return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) + return nil, false, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) } if phi < 0 || phi > 1 { - return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) + return nil, false, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) } phis[i] = phi } if _, ok := outputsSeen["quantiles"]; ok { - return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") + return nil, false, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") } outputsSeen["quantiles"] = struct{}{} - return newQuantilesAggrState(phis), nil + return newQuantilesAggrState(phis), false, nil } switch output { case "avg": - return newAvgAggrState(), nil + return newAvgAggrState(), false, nil case "count_samples": - return newCountSamplesAggrState(), nil + return newCountSamplesAggrState(), false, nil case "count_series": - return newCountSeriesAggrState(), nil + return newCountSeriesAggrState(), true, nil case "histogram_bucket": - return newHistogramBucketAggrState(stalenessInterval), nil + return newHistogramBucketAggrState(stalenessInterval), false, nil case "increase": - return newTotalAggrState(stalenessInterval, true, true), nil + return newTotalAggrState(stalenessInterval, true, true), true, nil case "increase_prometheus": - return newTotalAggrState(stalenessInterval, true, false), nil + return newTotalAggrState(stalenessInterval, true, false), true, nil case "last": - return newLastAggrState(), nil + return newLastAggrState(), false, nil case "max": - return newMaxAggrState(), nil + return newMaxAggrState(), false, nil case "min": - return newMinAggrState(), nil + return newMinAggrState(), false, nil case "rate_avg": - return newRateAggrState(stalenessInterval, true), nil + return newRateAggrState(stalenessInterval, true), true, nil case "rate_sum": - return newRateAggrState(stalenessInterval, false), nil + return newRateAggrState(stalenessInterval, false), true, nil case "stddev": - return newStddevAggrState(), nil + return newStddevAggrState(), false, nil case "stdvar": - return newStdvarAggrState(), nil + return newStdvarAggrState(), false, nil case "sum_samples": - return newSumSamplesAggrState(), nil + return newSumSamplesAggrState(), false, nil case "total": - return newTotalAggrState(stalenessInterval, false, true), nil + return newTotalAggrState(stalenessInterval, false, true), true, nil case "total_prometheus": - return newTotalAggrState(stalenessInterval, false, false), nil + return newTotalAggrState(stalenessInterval, false, false), true, nil case "unique_samples": - return newUniqueSamplesAggrState(), nil + return newUniqueSamplesAggrState(), false, nil default: - return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) + return nil, false, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) } } @@ -930,7 +937,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } bufLen := len(buf) - buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) + buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels, a.needInputKey) // key remains valid only by the end of this function and can't be reused after // do not intern key because number of unique keys could be too high key := bytesutil.ToUnsafeString(buf[bufLen:]) @@ -970,13 +977,15 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) { } } -func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { +func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label, needInputKey bool) []byte { bb := bbPool.Get() - bb.B = lc.Compress(bb.B, inputLabels) + bb.B = lc.Compress(bb.B, outputLabels) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = append(dst, bb.B...) bbPool.Put(bb) - dst = lc.Compress(dst, outputLabels) + if needInputKey { + dst = lc.Compress(dst, inputLabels) + } return dst } @@ -986,24 +995,24 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab func getOutputKey(key string) string { src := bytesutil.ToUnsafeBytes(key) - inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + outputKeyLen, nSize := encoding.UnmarshalVarUint64(src) if nSize <= 0 { - logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") + logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint") } src = src[nSize:] - outputKey := src[inputKeyLen:] + outputKey := src[:outputKeyLen] return bytesutil.ToUnsafeString(outputKey) } func getInputOutputKey(key string) (string, string) { src := bytesutil.ToUnsafeBytes(key) - inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) + outputKeyLen, nSize := encoding.UnmarshalVarUint64(src) if nSize <= 0 { - logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") + logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint") } src = src[nSize:] - inputKey := src[:inputKeyLen] - outputKey := src[inputKeyLen:] + outputKey := src[:outputKeyLen] + inputKey := src[outputKeyLen:] return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) }