lib/streamaggr: made input key compression optional

This commit is contained in:
Andrii Chubatiuk 2024-10-25 07:29:33 +03:00
parent ec0abe736a
commit fe9113b5b2
No known key found for this signature in database
GPG key ID: 96D776CC99880667
3 changed files with 49 additions and 39 deletions

View file

@ -18,6 +18,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip ## tip
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): `-rule` cmd-line flag now supports multi-document YAML files. This could be useful when rules are retrieved via HTTP URL where multiple rule files were merged together in one response. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6753). Thanks to @Irene-123 for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6995).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): disable input labels compression for outputs, that do not require it. This should improve memory usage and will help with labels compressor cleanup.
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): properly set `group_name` and `file` fields for recording rules in `/api/v1/rules`. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert): properly set `group_name` and `file` fields for recording rules in `/api/v1/rules`.

View file

@ -65,7 +65,7 @@ func newBenchSamples(count int) []pushSample {
Name: "app", Name: "app",
Value: fmt.Sprintf("instance-%d", i), Value: fmt.Sprintf("instance-%d", i),
}) })
keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:]) keyBuf = compressLabels(keyBuf[:0], labels[:labelsLen], labels[labelsLen:], true)
sample.key = string(keyBuf) sample.key = string(keyBuf)
sample.value = float64(i) sample.value = float64(i)
} }

View file

@ -371,6 +371,7 @@ type aggregator struct {
inputRelabeling *promrelabel.ParsedConfigs inputRelabeling *promrelabel.ParsedConfigs
outputRelabeling *promrelabel.ParsedConfigs outputRelabeling *promrelabel.ParsedConfigs
needInputKey bool
keepMetricNames bool keepMetricNames bool
ignoreOldSamples bool ignoreOldSamples bool
@ -554,13 +555,17 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
} }
needInputKey := false
aggrOutputs := make([]aggrOutput, len(cfg.Outputs)) aggrOutputs := make([]aggrOutput, len(cfg.Outputs))
outputsSeen := make(map[string]struct{}, len(cfg.Outputs)) outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs { for i, output := range cfg.Outputs {
as, err := newAggrState(output, outputsSeen, stalenessInterval) as, requireInputKey, err := newAggrState(output, outputsSeen, stalenessInterval)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if !needInputKey && requireInputKey {
needInputKey = requireInputKey
}
aggrOutputs[i] = aggrOutput{ aggrOutputs[i] = aggrOutput{
as: as, as: as,
@ -586,6 +591,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
inputRelabeling: inputRelabeling, inputRelabeling: inputRelabeling,
outputRelabeling: outputRelabeling, outputRelabeling: outputRelabeling,
needInputKey: needInputKey,
keepMetricNames: keepMetricNames, keepMetricNames: keepMetricNames,
ignoreOldSamples: ignoreOldSamples, ignoreOldSamples: ignoreOldSamples,
@ -614,6 +620,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
} }
if dedupInterval > 0 { if dedupInterval > 0 {
a.needInputKey = true
a.da = newDedupAggr() a.da = newDedupAggr()
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
@ -645,20 +652,20 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil return a, nil
} }
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) { func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, bool, error) {
// check for duplicated output // check for duplicated output
if _, ok := outputsSeen[output]; ok { if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output) return nil, false, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
} }
outputsSeen[output] = struct{}{} outputsSeen[output] = struct{}{}
if strings.HasPrefix(output, "quantiles(") { if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") { if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output") return nil, false, fmt.Errorf("missing closing brace for `quantiles()` output")
} }
argsStr := output[len("quantiles(") : len(output)-1] argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 { if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi") return nil, false, fmt.Errorf("`quantiles()` must contain at least one phi")
} }
args := strings.Split(argsStr, ",") args := strings.Split(argsStr, ",")
phis := make([]float64, len(args)) phis := make([]float64, len(args))
@ -666,57 +673,57 @@ func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInter
arg = strings.TrimSpace(arg) arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64) phi, err := strconv.ParseFloat(arg, 64)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err) return nil, false, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
} }
if phi < 0 || phi > 1 { if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi) return nil, false, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
} }
phis[i] = phi phis[i] = phi
} }
if _, ok := outputsSeen["quantiles"]; ok { if _, ok := outputsSeen["quantiles"]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`") return nil, false, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
} }
outputsSeen["quantiles"] = struct{}{} outputsSeen["quantiles"] = struct{}{}
return newQuantilesAggrState(phis), nil return newQuantilesAggrState(phis), false, nil
} }
switch output { switch output {
case "avg": case "avg":
return newAvgAggrState(), nil return newAvgAggrState(), false, nil
case "count_samples": case "count_samples":
return newCountSamplesAggrState(), nil return newCountSamplesAggrState(), false, nil
case "count_series": case "count_series":
return newCountSeriesAggrState(), nil return newCountSeriesAggrState(), true, nil
case "histogram_bucket": case "histogram_bucket":
return newHistogramBucketAggrState(stalenessInterval), nil return newHistogramBucketAggrState(stalenessInterval), false, nil
case "increase": case "increase":
return newTotalAggrState(stalenessInterval, true, true), nil return newTotalAggrState(stalenessInterval, true, true), true, nil
case "increase_prometheus": case "increase_prometheus":
return newTotalAggrState(stalenessInterval, true, false), nil return newTotalAggrState(stalenessInterval, true, false), true, nil
case "last": case "last":
return newLastAggrState(), nil return newLastAggrState(), false, nil
case "max": case "max":
return newMaxAggrState(), nil return newMaxAggrState(), false, nil
case "min": case "min":
return newMinAggrState(), nil return newMinAggrState(), false, nil
case "rate_avg": case "rate_avg":
return newRateAggrState(stalenessInterval, true), nil return newRateAggrState(stalenessInterval, true), true, nil
case "rate_sum": case "rate_sum":
return newRateAggrState(stalenessInterval, false), nil return newRateAggrState(stalenessInterval, false), true, nil
case "stddev": case "stddev":
return newStddevAggrState(), nil return newStddevAggrState(), false, nil
case "stdvar": case "stdvar":
return newStdvarAggrState(), nil return newStdvarAggrState(), false, nil
case "sum_samples": case "sum_samples":
return newSumSamplesAggrState(), nil return newSumSamplesAggrState(), false, nil
case "total": case "total":
return newTotalAggrState(stalenessInterval, false, true), nil return newTotalAggrState(stalenessInterval, false, true), true, nil
case "total_prometheus": case "total_prometheus":
return newTotalAggrState(stalenessInterval, false, false), nil return newTotalAggrState(stalenessInterval, false, false), true, nil
case "unique_samples": case "unique_samples":
return newUniqueSamplesAggrState(), nil return newUniqueSamplesAggrState(), false, nil
default: default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs) return nil, false, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
} }
} }
@ -930,7 +937,7 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
} }
bufLen := len(buf) bufLen := len(buf)
buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels) buf = compressLabels(buf, inputLabels.Labels, outputLabels.Labels, a.needInputKey)
// key remains valid only by the end of this function and can't be reused after // key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high // do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:]) key := bytesutil.ToUnsafeString(buf[bufLen:])
@ -970,13 +977,15 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
} }
} }
func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label) []byte { func compressLabels(dst []byte, inputLabels, outputLabels []prompbmarshal.Label, needInputKey bool) []byte {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = lc.Compress(bb.B, inputLabels) bb.B = lc.Compress(bb.B, outputLabels)
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B))) dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
dst = append(dst, bb.B...) dst = append(dst, bb.B...)
bbPool.Put(bb) bbPool.Put(bb)
dst = lc.Compress(dst, outputLabels) if needInputKey {
dst = lc.Compress(dst, inputLabels)
}
return dst return dst
} }
@ -986,24 +995,24 @@ func decompressLabels(dst []prompbmarshal.Label, key string) []prompbmarshal.Lab
func getOutputKey(key string) string { func getOutputKey(key string) string {
src := bytesutil.ToUnsafeBytes(key) src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 { if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
} }
src = src[nSize:] src = src[nSize:]
outputKey := src[inputKeyLen:] outputKey := src[:outputKeyLen]
return bytesutil.ToUnsafeString(outputKey) return bytesutil.ToUnsafeString(outputKey)
} }
func getInputOutputKey(key string) (string, string) { func getInputOutputKey(key string) (string, string) {
src := bytesutil.ToUnsafeBytes(key) src := bytesutil.ToUnsafeBytes(key)
inputKeyLen, nSize := encoding.UnmarshalVarUint64(src) outputKeyLen, nSize := encoding.UnmarshalVarUint64(src)
if nSize <= 0 { if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") logger.Panicf("BUG: cannot unmarshal outputKeyLen from uvarint")
} }
src = src[nSize:] src = src[nSize:]
inputKey := src[:inputKeyLen] outputKey := src[:outputKeyLen]
outputKey := src[inputKeyLen:] inputKey := src[outputKeyLen:]
return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey) return bytesutil.ToUnsafeString(inputKey), bytesutil.ToUnsafeString(outputKey)
} }