mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
lib/streamaggr: skip de-duplication for series, which do not match the configured aggregation rules
Previously all the incoming samples were de-duplicated, even if their series doesn't match aggregation rule filters. This could result in increased CPU usage. Now the de-duplication isn't applied to samples for series, which do not match aggregation rule filters. Such samples are just ignored.
This commit is contained in:
parent
b3dbc7e8bc
commit
01a2859f43
1 changed files with 30 additions and 16 deletions
|
@ -449,7 +449,7 @@ func (a *aggregator) dedupFlush() {
|
||||||
skipAggrSuffix: true,
|
skipAggrSuffix: true,
|
||||||
}
|
}
|
||||||
a.dedupAggr.appendSeriesForFlush(ctx)
|
a.dedupAggr.appendSeriesForFlush(ctx)
|
||||||
a.push(ctx.tss)
|
a.push(ctx.tss, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *aggregator) flush() {
|
func (a *aggregator) flush() {
|
||||||
|
@ -500,7 +500,7 @@ func (a *aggregator) MustStop() {
|
||||||
// Push pushes tss to a.
|
// Push pushes tss to a.
|
||||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||||
if a.dedupAggr == nil {
|
if a.dedupAggr == nil {
|
||||||
a.push(tss)
|
a.push(tss, true)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,25 +509,11 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||||
pushSample := a.dedupAggr.pushSample
|
pushSample := a.dedupAggr.pushSample
|
||||||
inputKey := ""
|
inputKey := ""
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
for _, ts := range tss {
|
|
||||||
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
|
|
||||||
outputKey := bytesutil.InternBytes(bb.B)
|
|
||||||
for _, sample := range ts.Samples {
|
|
||||||
pushSample(inputKey, outputKey, sample.Value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bbPool.Put(bb)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
|
|
||||||
labels := promutils.GetLabels()
|
labels := promutils.GetLabels()
|
||||||
tmpLabels := promutils.GetLabels()
|
|
||||||
bb := bbPool.Get()
|
|
||||||
for _, ts := range tss {
|
for _, ts := range tss {
|
||||||
if !a.match.Match(ts.Labels) {
|
if !a.match.Match(ts.Labels) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
||||||
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
|
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
|
||||||
if len(labels.Labels) == 0 {
|
if len(labels.Labels) == 0 {
|
||||||
|
@ -536,6 +522,34 @@ func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
|
||||||
}
|
}
|
||||||
labels.Sort()
|
labels.Sort()
|
||||||
|
|
||||||
|
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
|
||||||
|
outputKey := bytesutil.InternBytes(bb.B)
|
||||||
|
for _, sample := range ts.Samples {
|
||||||
|
pushSample(inputKey, outputKey, sample.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
promutils.PutLabels(labels)
|
||||||
|
bbPool.Put(bb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *aggregator) push(tss []prompbmarshal.TimeSeries, applyFilters bool) {
|
||||||
|
labels := promutils.GetLabels()
|
||||||
|
tmpLabels := promutils.GetLabels()
|
||||||
|
bb := bbPool.Get()
|
||||||
|
for _, ts := range tss {
|
||||||
|
if applyFilters && !a.match.Match(ts.Labels) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labels.Labels = append(labels.Labels[:0], ts.Labels...)
|
||||||
|
if applyFilters {
|
||||||
|
labels.Labels = a.inputRelabeling.Apply(labels.Labels, 0)
|
||||||
|
if len(labels.Labels) == 0 {
|
||||||
|
// The metric has been deleted by the relabeling
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labels.Sort()
|
||||||
|
}
|
||||||
|
|
||||||
if a.aggregateOnlyByTime {
|
if a.aggregateOnlyByTime {
|
||||||
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
|
bb.B = marshalLabelsFast(bb.B[:0], labels.Labels)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue