Streaming aggregation branch clean

This commit is contained in:
Alexander Marshalov 2023-11-22 15:11:13 +01:00
parent 5bc3488538
commit 8725b5e049
No known key found for this signature in database
11 changed files with 138 additions and 881 deletions

View file

@ -6,7 +6,6 @@ import (
"net/url"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -41,8 +40,6 @@ var (
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#sharding-among-remote-storages")
shardByURLLabels = flag.String("remoteWrite.shardByURL.labels", "", "Comma-separated list of label names for sharding across all the -remoteWrite.url. All labels of timeseries are used by default. "+
"See also -remoteWrite.shardByURL and https://docs.victoriametrics.com/vmagent.html#sharding-among-remote-storages")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
"See also -remoteWrite.maxDiskUsagePerURL")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@ -95,8 +92,6 @@ var (
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
defaultAuthToken = &auth.Token{}
shardLabelsFilter map[string]struct{}
)
// MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified.
@ -176,12 +171,6 @@ func Init() {
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
if *shardByURLLabels != "" {
for _, label := range strings.Split(*shardByURLLabels, ",") {
shardLabelsFilter[strings.TrimSpace(label)] = struct{}{}
}
}
// Start config reloader.
configReloaderWG.Add(1)
go func() {
@ -430,7 +419,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
// Shard the data among rwctxs
tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs))
for _, ts := range tssBlock {
h := getLabelsHash(ts.Labels, shardLabelsFilter)
h := getLabelsHash(ts.Labels)
idx := h % uint64(len(tssByURL))
tssByURL[idx] = append(tssByURL[idx], ts)
}
@ -483,7 +472,7 @@ func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.Time
dst := make([]prompbmarshal.TimeSeries, 0, len(tss))
for i := range tss {
labels := tss[i].Labels
h := getLabelsHash(labels, nil)
h := getLabelsHash(labels)
if hourlySeriesLimiter != nil && !hourlySeriesLimiter.Add(h) {
hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", hourlySeriesLimiter.MaxItems())
@ -507,16 +496,10 @@ var (
dailySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_daily_series_limit_rows_dropped_total`)
)
func getLabelsHash(labels []prompbmarshal.Label, filterLabels map[string]struct{}) uint64 {
func getLabelsHash(labels []prompbmarshal.Label) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
for _, label := range labels {
if len(filterLabels) > 0 {
_, ok := filterLabels[label.Name]
if !ok {
continue
}
}
b = append(b, label.Name...)
b = append(b, label.Value...)
}

View file

@ -4,7 +4,6 @@ import (
"embed"
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"net/http"
"strings"
"sync/atomic"
@ -40,6 +39,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
)
var (

View file

@ -1,12 +0,0 @@
package promql
import "testing"
func TestIsMetricSelectorWithRollup(t *testing.T) {
childQuery, _, _ := IsMetricSelectorWithRollup("metric_name{label='value'}[365d] or vector(0)")
if childQuery != "" {
t.Fatalf("AAAAA: %v", childQuery)
} else {
t.Fatalf("BBBBB")
}
}

View file

@ -160,7 +160,7 @@ func TestDerivValues(t *testing.T) {
testRowsEqual(t, values, timestamps, valuesExpected, timestamps)
}
func testRollupFuncWithValues(t *testing.T, funcName string, args []interface{}, vInput []float64, vTimestamps []int64, vExpected float64) {
func testRollupFunc(t *testing.T, funcName string, args []interface{}, vExpected float64) {
t.Helper()
nrf := getRollupFunc(funcName)
if nrf == nil {
@ -172,11 +172,9 @@ func testRollupFuncWithValues(t *testing.T, funcName string, args []interface{},
}
var rfa rollupFuncArg
rfa.prevValue = nan
rfa.realPrevValue = nan
rfa.realNextValue = nan
rfa.prevTimestamp = 0
rfa.values = append(rfa.values, vInput...)
rfa.timestamps = append(rfa.timestamps, vTimestamps...)
rfa.values = append(rfa.values, testValues...)
rfa.timestamps = append(rfa.timestamps, testTimestamps...)
rfa.window = rfa.timestamps[len(rfa.timestamps)-1] - rfa.timestamps[0]
if rollupFuncsRemoveCounterResets[funcName] {
removeCounterResets(rfa.values)
@ -196,10 +194,6 @@ func testRollupFuncWithValues(t *testing.T, funcName string, args []interface{},
}
}
func testRollupFunc(t *testing.T, funcName string, args []interface{}, vExpected float64) {
testRollupFuncWithValues(t, funcName, args, testValues, testTimestamps, vExpected)
}
func TestRollupDurationOverTime(t *testing.T) {
f := func(maxInterval, dExpected float64) {
t.Helper()
@ -1480,121 +1474,3 @@ func TestRollupDelta(t *testing.T) {
f(1, nan, nan, nil, 0)
f(100, nan, nan, nil, 0)
}
func TestIncrease(t *testing.T) {
f := func(funcName string, vInput []float64, vExpected float64) {
t.Helper()
var me metricsql.MetricExpr
args := []interface{}{&metricsql.RollupExpr{Expr: &me}}
testRollupFuncWithValues(t, funcName, args, vInput, []int64{1, 2}, vExpected)
}
f(
"increase",
[]float64{100, 100},
0,
)
f(
"increase",
[]float64{100, 90},
0,
)
f(
"increase",
[]float64{100, 88},
0,
)
f(
"increase",
[]float64{100, 87},
187,
)
f(
"increase",
[]float64{100, 187},
187,
)
f(
"increase",
[]float64{100, 87, 200, 287},
387,
)
f(
"increase",
[]float64{100, 187, 200, 287},
287,
)
f(
"increase",
[]float64{100, 87, 200, 87},
387,
)
f(
"increase",
[]float64{100, 87, 200, 187},
300,
)
f(
"increase",
[]float64{100, 87, 200, 177},
300,
)
f(
"increase",
[]float64{100, 13},
113,
)
f(
"increase",
[]float64{100, 9},
9,
)
f(
"increase",
[]float64{100, 1},
1,
)
f(
"increase",
[]float64{100, 0},
0,
)
f(
"increase",
[]float64{100, -1},
-1,
)
f(
"increase",
[]float64{100, -10},
90,
)
f(
"increase",
[]float64{100, -90},
10,
)
}

View file

@ -631,7 +631,6 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# (it's like this series didn't exist until now).
# Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
# By default, is equal to x2 of the `interval` field.
# The parameter is only relevant for outputs: total, increase and histogram_bucket.
#
# staleness_interval: 2m

View file

@ -0,0 +1,128 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesTotalAggrState calculates output=countSamplesTotal, e.g. the count of input samples.
type countSamplesTotalAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type countSamplesTotalStateValue struct {
mu sync.Mutex
n uint64
deleted bool
deleteDeadline uint64
}
func newCountSamplesTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *countSamplesTotalAggrState {
return &countSamplesTotalAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *countSamplesTotalAggrState) pushSample(_, outputKey string, _ float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &countSamplesTotalStateValue{
n: 1,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*countSamplesTotalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.n++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *countSamplesTotalAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSamplesTotalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *countSamplesTotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSamplesTotalStateValue)
sv.mu.Lock()
n := sv.n
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
}
func (as *countSamplesTotalAggrState) getOutputName() string {
return "count_samples_total"
}
func (as *countSamplesTotalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSamplesTotalStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.n,
})
return true
})
return result
}

View file

@ -1,176 +0,0 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newincreaseAggrState calculates output=newincrease, e.g. the newincrease over input counters.
type newincreaseAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newincreaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewincreaseAggrState(interval time.Duration, stalenessInterval time.Duration) *newincreaseAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newincreaseAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newincreaseAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newincreaseStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newincreaseStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total = correctedDelta
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newincreaseAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreaseStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newincreaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreaseStateValue)
sv.mu.Lock()
newincrease := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, newincrease)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newincreaseAggrState) getOutputName() string {
return "newincrease"
}
func (as *newincreaseAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newincreaseStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -1,176 +0,0 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newincreasePureAggrState calculates output=newincrease, e.g. the newincrease over input counters.
type newincreasePureAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newincreasePureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewincreasePureAggrState(interval time.Duration, stalenessInterval time.Duration) *newincreasePureAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newincreasePureAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newincreasePureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newincreasePureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total = correctedDelta
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newincreasePureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newincreasePureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
newincrease := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, newincrease)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newincreasePureAggrState) getOutputName() string {
return "newincrease_pure"
}
func (as *newincreasePureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newincreasePureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -1,179 +0,0 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newtotalAggrState calculates output=newtotal, e.g. the summary counter over input counters.
type newtotalAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newtotalStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewtotalAggrState(interval time.Duration, stalenessInterval time.Duration) *newtotalAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newtotalAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newtotalAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newtotalStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newtotalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total = correctedDelta
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newtotalAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newtotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalStateValue)
sv.mu.Lock()
total := sv.total
if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, total)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newtotalAggrState) getOutputName() string {
return "newtotal"
}
func (as *newtotalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newtotalStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -1,177 +0,0 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newotalPureAggrState calculates output=newtotal, e.g. the summary counter over input counters.
type newotalPureAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newtotalPureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewotalPureAggrState(interval time.Duration, stalenessInterval time.Duration) *newotalPureAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newotalPureAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newotalPureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newtotalPureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
sv.total = correctedDelta
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newotalPureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newotalPureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
total := sv.total
if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, total)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newotalPureAggrState) getOutputName() string {
return "newtotal_pure"
}
func (as *newotalPureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newtotalPureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -25,14 +25,11 @@ import (
var supportedOutputs = []string{
"total",
"total_pure",
"newtotal",
"newtotal_pure",
"increase",
"increase_pure",
"newincrease",
"newincrease_pure",
"count_series",
"count_samples",
"count_samples_total",
"sum_samples",
"sum_samples_total",
"last",
@ -355,22 +352,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
aggrStates[i] = newTotalAggrState(interval, stalenessInterval)
case "total_pure":
aggrStates[i] = newTotalPureAggrState(interval, stalenessInterval)
case "newtotal":
aggrStates[i] = newnewtotalAggrState(interval, stalenessInterval)
case "newtotal_pure":
aggrStates[i] = newnewotalPureAggrState(interval, stalenessInterval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval)
case "increase_pure":
aggrStates[i] = newIncreasePureAggrState(interval, stalenessInterval)
case "newincrease":
aggrStates[i] = newnewincreaseAggrState(interval, stalenessInterval)
case "newincrease_pure":
aggrStates[i] = newnewincreasePureAggrState(interval, stalenessInterval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState(interval, stalenessInterval)
case "count_samples":
aggrStates[i] = newCountSamplesAggrState(interval, stalenessInterval)
case "count_samples_total":
aggrStates[i] = newCountSamplesTotalAggrState(interval, stalenessInterval)
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState(interval, stalenessInterval)
case "sum_samples_total":