VictoriaMetrics/lib/streamaggr/deduplicator_test.go
Andrii Chubatiuk 861852f262
lib/streamaggr: added stale samples metric, added metrics labels (#6462)
### Describe Your Changes

- added stale metrics counters for input and output samples
- added labels for aggregator metrics =>
`name="{rwctx}:{aggrId}:{aggrSuffix}"`
   - rwctx - global or number starting from 1
   - aggrid - aggregator id starting from 1
   - aggrSuffix - <interval>_(by|without)_label1_label2_labeln
   e.g: `name="global:1:1m_without_instance_pod"`

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/contributing/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
2024-07-01 14:56:17 +02:00

49 lines
1.8 KiB
Go

package streamaggr
import (
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
)
func TestDeduplicator(t *testing.T) {
var tssResult []prompbmarshal.TimeSeries
var tssResultLock sync.Mutex
pushFunc := func(tss []prompbmarshal.TimeSeries) {
tssResultLock.Lock()
tssResult = appendClonedTimeseries(tssResult, tss)
tssResultLock.Unlock()
}
tss := mustParsePromMetrics(`
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 123
bar{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 34.54
x 8943 1000
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -34.34
x 90984 900
x 433 1000
asfjkldsf{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 12322
foo{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} 894
baz_aaa_aaa_fdd{instance="x",job="aaa",pod="sdfd-dfdfdfs",node="aosijjewrerfd",namespace="asdff",container="ohohffd"} -2.3
`)
d := NewDeduplicator(pushFunc, time.Hour, []string{"node", "instance"}, "global")
for i := 0; i < 10; i++ {
d.Push(tss)
}
d.flush(pushFunc, time.Hour)
d.MustStop()
result := timeSeriessToString(tssResult)
resultExpected := `asfjkldsf{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 12322
bar{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 34.54
baz_aaa_aaa_fdd{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} -2.3
foo{container="ohohffd",job="aaa",namespace="asdff",pod="sdfd-dfdfdfs"} 894
x 8943
`
if result != resultExpected {
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)
}
}