This commit is contained in:
Alexander Marshalov 2023-10-18 16:04:38 +02:00
parent ed1bef0e2d
commit 1cd6232537
No known key found for this signature in database
26 changed files with 1705 additions and 372 deletions

View file

@ -42,6 +42,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
)
var (
@ -228,7 +229,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
{"metric-relabel-debug", "debug metric relabeling"},
{"api/v1/targets", "advanced information about discovered targets in JSON format"},
{"config", "-promscrape.config contents"},
//{"stream-agg", "streaming aggregation status"},
{"stream-agg", "streaming aggregation status"},
{"metrics", "available service metrics"},
{"flags", "command-line flags"},
{"-/reload", "reload configuration"},
@ -433,9 +434,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
return true
//case "/stream-agg":
// streamaggr.WriteHumanReadableState(w, r)
// return true
case "/stream-agg":
streamaggr.WriteHumanReadableState(w, r, remotewrite.GetAggregators())
return true
case "/ready":
if rdy := atomic.LoadInt32(&promscrape.PendingScrapeConfigs); rdy > 0 {
errMsg := fmt.Sprintf("waiting for scrapes to init, left: %d", rdy)

View file

@ -6,6 +6,7 @@ import (
"net/url"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -40,6 +41,8 @@ var (
"Pass multiple -remoteWrite.multitenantURL flags in order to replicate data to multiple remote storage systems. See also -remoteWrite.url")
shardByURL = flag.Bool("remoteWrite.shardByURL", false, "Whether to shard outgoing series across all the remote storage systems enumerated via -remoteWrite.url . "+
"By default the data is replicated across all the -remoteWrite.url . See https://docs.victoriametrics.com/vmagent.html#sharding-among-remote-storages")
shardByURLLabels = flag.String("remoteWrite.shardByURL.labels", "", "Comma-separated list of label names for sharding across all the -remoteWrite.url. All labels of timeseries are used by default. "+
"See also -remoteWrite.shardByURL and https://docs.victoriametrics.com/vmagent.html#sharding-among-remote-storages")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory where temporary data for remote write component is stored. "+
"See also -remoteWrite.maxDiskUsagePerURL")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@ -92,6 +95,8 @@ var (
// Data without tenant id is written to defaultAuthToken if -remoteWrite.multitenantURL is specified.
defaultAuthToken = &auth.Token{}
shardLabelsFilter map[string]struct{}
)
// MultitenancyEnabled returns true if -remoteWrite.multitenantURL is specified.
@ -171,6 +176,12 @@ func Init() {
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
if *shardByURLLabels != "" {
for _, label := range strings.Split(*shardByURLLabels, ",") {
shardLabelsFilter[strings.TrimSpace(label)] = struct{}{}
}
}
// Start config reloader.
configReloaderWG.Add(1)
go func() {
@ -419,7 +430,7 @@ func pushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarsha
// Shard the data among rwctxs
tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs))
for _, ts := range tssBlock {
h := getLabelsHash(ts.Labels)
h := getLabelsHash(ts.Labels, shardLabelsFilter)
idx := h % uint64(len(tssByURL))
tssByURL[idx] = append(tssByURL[idx], ts)
}
@ -472,7 +483,7 @@ func limitSeriesCardinality(tss []prompbmarshal.TimeSeries) []prompbmarshal.Time
dst := make([]prompbmarshal.TimeSeries, 0, len(tss))
for i := range tss {
labels := tss[i].Labels
h := getLabelsHash(labels)
h := getLabelsHash(labels, nil)
if hourlySeriesLimiter != nil && !hourlySeriesLimiter.Add(h) {
hourlySeriesLimitRowsDropped.Add(len(tss[i].Samples))
logSkippedSeries(labels, "-remoteWrite.maxHourlySeries", hourlySeriesLimiter.MaxItems())
@ -496,10 +507,16 @@ var (
dailySeriesLimitRowsDropped = metrics.NewCounter(`vmagent_daily_series_limit_rows_dropped_total`)
)
func getLabelsHash(labels []prompbmarshal.Label) uint64 {
func getLabelsHash(labels []prompbmarshal.Label, filterLabels map[string]struct{}) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
for _, label := range labels {
if len(filterLabels) > 0 {
_, ok := filterLabels[label.Name]
if !ok {
continue
}
}
b = append(b, label.Name...)
b = append(b, label.Value...)
}
@ -802,3 +819,23 @@ func CheckStreamAggrConfigs() error {
}
return nil
}
func GetAggregators() map[string]*streamaggr.Aggregators {
var result = map[string]*streamaggr.Aggregators{}
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for tenant, rwctxs := range rwctxsMap {
for rwNum, rw := range rwctxs {
result[fmt.Sprintf("rw %d for tenant %v:%v", rwNum, tenant.AccountID, tenant.ProjectID)] = rw.sas.Load()
}
}
rwctxsMapLock.Unlock()
} else {
for rwNum, rw := range rwctxsDefault {
result[fmt.Sprintf("remote write %d", rwNum)] = rw.sas.Load()
}
}
return result
}

View file

@ -2,27 +2,38 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// avgAggrState calculates output=avg, e.g. the average value over input samples.
type avgAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type avgStateValue struct {
mu sync.Mutex
sum float64
count int64
deleted bool
mu sync.Mutex
sum float64
count uint64
deleted bool
deleteDeadline uint64
}
func newAvgAggrState() *avgAggrState {
return &avgAggrState{}
func newAvgAggrState(interval time.Duration, stalenessInterval time.Duration) *avgAggrState {
return &avgAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *avgAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -45,6 +56,7 @@ again:
if !deleted {
sv.sum += value
sv.count++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -54,23 +66,48 @@ again:
}
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *avgAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*avgStateValue)
sv.mu.Lock()
avg := sv.sum / float64(sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
var avg float64
if sv.count > 0 {
avg = sv.sum / float64(sv.count)
}
sv.sum = 0
sv.count = 0
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, avg)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *avgAggrState) getOutputName() string {
@ -87,8 +124,11 @@ func (as *avgAggrState) getStateRepresentation(suffix string) []aggrStateReprese
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.sum / float64(value.count),
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum / float64(value.count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.count,
})
return true
})

View file

@ -2,26 +2,37 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSamplesAggrState calculates output=countSamples, e.g. the count of input samples.
type countSamplesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type countSamplesStateValue struct {
mu sync.Mutex
n uint64
deleted bool
mu sync.Mutex
n uint64
deleted bool
deleteDeadline uint64
}
func newCountSamplesAggrState() *countSamplesAggrState {
return &countSamplesAggrState{}
func newCountSamplesAggrState(interval time.Duration, stalenessInterval time.Duration) *countSamplesAggrState {
return &countSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -42,6 +53,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.n++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -51,23 +63,44 @@ again:
}
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSamplesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.n = 0
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
}
func (as *countSamplesAggrState) getOutputName() string {
@ -84,8 +117,11 @@ func (as *countSamplesAggrState) getStateRepresentation(suffix string) []aggrSta
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: float64(value.n),
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.n,
})
return true
})

View file

@ -2,27 +2,39 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// countSeriesAggrState calculates output=count_series, e.g. the number of unique series.
type countSeriesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type countSeriesStateValue struct {
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
deleted bool
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newCountSeriesAggrState() *countSeriesAggrState {
return &countSeriesAggrState{}
func newCountSeriesAggrState(interval time.Duration, stalenessInterval time.Duration) *countSeriesAggrState {
return &countSeriesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -48,7 +60,9 @@ again:
if _, ok := sv.countedSeries[inputKey]; !ok {
sv.countedSeries[inputKey] = struct{}{}
sv.n++
sv.deleteDeadline = deleteDeadline
}
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@ -58,23 +72,49 @@ again:
}
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *countSeriesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := sv.n
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.n = 0
// todo: use builtin function clear after switching to go 1.21
for csk := range sv.countedSeries {
delete(sv.countedSeries, csk)
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
}
func (as *countSeriesAggrState) getOutputName() string {
@ -91,8 +131,11 @@ func (as *countSeriesAggrState) getStateRepresentation(suffix string) []aggrStat
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: float64(value.n),
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -12,22 +12,24 @@ import (
// histogramBucketAggrState calculates output=histogramBucket, e.g. VictoriaMetrics histogram over input samples.
type histogramBucketAggrState struct {
m sync.Map
stalenessSecs uint64
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type histogramBucketStateValue struct {
mu sync.Mutex
h metrics.Histogram
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newHistogramBucketAggrState(stalenessInterval time.Duration) *histogramBucketAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
func newHistogramBucketAggrState(interval time.Duration, stalenessInterval time.Duration) *histogramBucketAggrState {
return &histogramBucketAggrState{
stalenessSecs: stalenessSecs,
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
@ -51,6 +53,7 @@ again:
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
@ -100,6 +103,8 @@ func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
sv.mu.Unlock()
return true
})
as.lastPushTimestamp = currentTime
}
func (as *histogramBucketAggrState) getOutputName() string {
@ -121,7 +126,10 @@ func (as *histogramBucketAggrState) getStateRepresentation(suffix string) []aggr
Name: vmrange,
Value: vmrange,
}),
value: float64(count),
currentValue: float64(count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
})
return true

View file

@ -9,16 +9,18 @@ import (
// increaseAggrState calculates output=increase, e.g. the increase over input counters.
type increaseAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type increaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@ -28,8 +30,9 @@ func newIncreaseAggrState(interval time.Duration, stalenessInterval time.Duratio
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &increaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
@ -69,6 +72,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@ -126,6 +130,8 @@ func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *increaseAggrState) getOutputName() string {
@ -142,8 +148,11 @@ func (as *increaseAggrState) getStateRepresentation(suffix string) []aggrStateRe
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.total,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -0,0 +1,153 @@
package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// increasePureAggrState calculates output=increase_pure, e.g. the increasePure over input counters.
type increasePureAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type increasePureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newIncreasePureAggrState(interval time.Duration, stalenessInterval time.Duration) *increasePureAggrState {
return &increasePureAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *increasePureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &increasePureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*increasePureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
sv.total += d
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *increasePureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increasePureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *increasePureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*increasePureStateValue)
sv.mu.Lock()
increasePure := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, increasePure)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *increasePureAggrState) getOutputName() string {
return "increase_pure"
}
func (as *increasePureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*increasePureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -2,26 +2,38 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// lastAggrState calculates output=last, e.g. the last value over input samples.
type lastAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type lastStateValue struct {
mu sync.Mutex
last float64
deleted bool
mu sync.Mutex
last float64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newLastAggrState() *lastAggrState {
return &lastAggrState{}
func newLastAggrState(interval time.Duration, stalenessInterval time.Duration) *lastAggrState {
return &lastAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *lastAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -42,6 +54,8 @@ again:
deleted := sv.deleted
if !deleted {
sv.last = value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -51,23 +65,43 @@ again:
}
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *lastAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, last)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *lastAggrState) getOutputName() string {
@ -84,8 +118,11 @@ func (as *lastAggrState) getStateRepresentation(suffix string) []aggrStateRepres
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.last,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.last,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -2,26 +2,38 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
type maxAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type maxStateValue struct {
mu sync.Mutex
max float64
deleted bool
mu sync.Mutex
max float64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newMaxAggrState() *maxAggrState {
return &maxAggrState{}
func newMaxAggrState(interval time.Duration, stalenessInterval time.Duration) *maxAggrState {
return &maxAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *maxAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -44,6 +56,8 @@ again:
if value > sv.max {
sv.max = value
}
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -53,25 +67,46 @@ again:
}
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *maxAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*maxStateValue)
sv.mu.Lock()
max := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, max)
if deleted {
m.Delete(k)
}
return true
})
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*maxStateValue)
sv.mu.Lock()
value := sv.max
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, value)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *maxAggrState) getOutputName() string {
return "max"
}
@ -86,8 +121,11 @@ func (as *maxAggrState) getStateRepresentation(suffix string) []aggrStateReprese
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.max,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.max,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -2,26 +2,38 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// minAggrState calculates output=min, e.g. the minimum value over input samples.
type minAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type minStateValue struct {
mu sync.Mutex
min float64
deleted bool
mu sync.Mutex
min float64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newMinAggrState() *minAggrState {
return &minAggrState{}
func newMinAggrState(interval time.Duration, stalenessInterval time.Duration) *minAggrState {
return &minAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *minAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -43,6 +55,8 @@ again:
if !deleted {
if value < sv.min {
sv.min = value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
}
sv.mu.Unlock()
@ -53,25 +67,45 @@ again:
}
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *minAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*minStateValue)
sv.mu.Lock()
min := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, min)
if deleted {
m.Delete(k)
}
return true
})
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*minStateValue)
sv.mu.Lock()
m := sv.min
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, m)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *minAggrState) getOutputName() string {
return "min"
}
@ -86,8 +120,11 @@ func (as *minAggrState) getStateRepresentation(suffix string) []aggrStateReprese
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.min,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.min,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -10,17 +10,18 @@ import (
// newincreaseAggrState calculates output=newincrease, e.g. the newincrease over input counters.
type newincreaseAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newincreaseStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
first float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@ -30,8 +31,9 @@ func newnewincreaseAggrState(interval time.Duration, stalenessInterval time.Dura
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newincreaseAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
@ -87,6 +89,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@ -144,6 +147,7 @@ func (as *newincreaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newincreaseAggrState) getOutputName() string {
@ -160,8 +164,11 @@ func (as *newincreaseAggrState) getStateRepresentation(suffix string) []aggrStat
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.total,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -0,0 +1,176 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newincreasePureAggrState calculates output=newincrease, e.g. the newincrease over input counters.
type newincreasePureAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newincreasePureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewincreasePureAggrState(interval time.Duration, stalenessInterval time.Duration) *newincreasePureAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newincreasePureAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newincreasePureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newincreasePureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
if ok || currentTime > as.ignoreInputDeadline {
sv.total = correctedDelta
}
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newincreasePureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newincreasePureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newincreasePureStateValue)
sv.mu.Lock()
newincrease := sv.total
sv.total = 0
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, newincrease)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newincreasePureAggrState) getOutputName() string {
return "newincrease_pure"
}
func (as *newincreasePureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newincreasePureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -10,16 +10,18 @@ import (
// newtotalAggrState calculates output=newtotal, e.g. the summary counter over input counters.
type newtotalAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newtotalStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@ -29,8 +31,9 @@ func newnewtotalAggrState(interval time.Duration, stalenessInterval time.Duratio
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newtotalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
@ -86,6 +89,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@ -146,6 +150,7 @@ func (as *newtotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newtotalAggrState) getOutputName() string {
@ -162,8 +167,11 @@ func (as *newtotalAggrState) getStateRepresentation(suffix string) []aggrStateRe
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.total,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -0,0 +1,177 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// newotalPureAggrState calculates output=newtotal, e.g. the summary counter over input counters.
type newotalPureAggrState struct {
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type newtotalPureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newnewotalPureAggrState(interval time.Duration, stalenessInterval time.Duration) *newotalPureAggrState {
currentTime := fasttime.UnixTimestamp()
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &newotalPureAggrState{
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
func (as *newotalPureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &newtotalPureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
lv.firstValue = value
lv.value = value
lv.correction = 0
sv.lastValues[inputKey] = lv
}
// process counter reset
delta := value - lv.value
if delta < 0 {
if (-delta * 8) < lv.value {
lv.correction += lv.value - value
} else {
lv.correction += lv.value
}
}
// process increasing counter
correctedValue := value + lv.correction
correctedDelta := correctedValue - lv.firstValue
if ok && math.Abs(correctedValue) < 10*(math.Abs(correctedDelta)+1) {
correctedDelta = correctedValue
}
sv.total = correctedDelta
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *newotalPureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *newotalPureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*newtotalPureStateValue)
sv.mu.Lock()
total := sv.total
if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, total)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *newotalPureAggrState) getOutputName() string {
return "newtotal_pure"
}
func (as *newotalPureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*newtotalPureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -4,6 +4,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"strconv"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -12,24 +13,33 @@ import (
// quantilesAggrState calculates output=quantiles, e.g. the the given quantiles over the input samples.
type quantilesAggrState struct {
m sync.Map
phis []float64
m sync.Map
phis []float64
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
deleted bool
mu sync.Mutex
h *histogram.Fast
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newQuantilesAggrState(phis []float64) *quantilesAggrState {
func newQuantilesAggrState(interval time.Duration, stalenessInterval time.Duration, phis []float64) *quantilesAggrState {
return &quantilesAggrState{
phis: phis,
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
phis: phis,
}
}
func (as *quantilesAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -50,6 +60,8 @@ again:
deleted := sv.deleted
if !deleted {
sv.h.Update(value)
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -59,22 +71,42 @@ again:
}
}
func (as *quantilesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
histogram.PutFast(sv.h)
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
histogram.PutFast(sv.h)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.h.Reset()
sv.mu.Unlock()
key := k.(string)
@ -85,6 +117,7 @@ func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *quantilesAggrState) getOutputName() string {
@ -108,7 +141,10 @@ func (as *quantilesAggrState) getStateRepresentation(suffix string) []aggrStateR
Name: "quantile",
Value: bytesutil.InternBytes(b),
}),
value: quantile,
currentValue: quantile,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
}
return true

View file

@ -1,6 +1,8 @@
{% import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
) %}
@ -164,7 +166,10 @@
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Value</th>
<th scope="col">Current value</th>
<th scope="col">Samples count</th>
<th scope="col">Last push time</th>
<th scope="col">Next push time</th>
</tr>
</thead>
<tbody>
@ -174,7 +179,24 @@
<code>{%s asr.metric %}</code>
</td>
<td class="text-end">
{%f asr.value %}
{%f asr.currentValue %}
</td>
<td class="text-end">
{%s fmt.Sprintf("%v", asr.samplesCount) %}
</td>
<td>
{% if asr.lastPushTimestamp == 0 %}
{%s time.Unix(int64(agg.initialTime), 0).String() %}
{% else %}
{%s time.Unix(int64(asr.lastPushTimestamp), 0).String() %}
{% endif %}
</td>
<td>
{% if asr.lastPushTimestamp == 0 %}
{%s time.Unix(int64(asr.nextPushTimestamp + agg.initialTime), 0).Format(time.RFC3339) %}
{% else %}
{%s time.Unix(int64(asr.nextPushTimestamp), 0).Format(time.RFC3339) %}
{% endif %}
</td>
</tr>
{% endfor %}

View file

@ -6,262 +6,264 @@ package streamaggr
//line lib/streamaggr/state.qtpl:1
import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
)
//line lib/streamaggr/state.qtpl:10
//line lib/streamaggr/state.qtpl:12
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line lib/streamaggr/state.qtpl:10
//line lib/streamaggr/state.qtpl:12
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line lib/streamaggr/state.qtpl:10
//line lib/streamaggr/state.qtpl:12
func StreamStreamAggHTML(qw422016 *qt422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:10
//line lib/streamaggr/state.qtpl:12
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:14
//line lib/streamaggr/state.qtpl:16
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:14
//line lib/streamaggr/state.qtpl:16
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:18
//line lib/streamaggr/state.qtpl:20
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:18
//line lib/streamaggr/state.qtpl:20
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12"><h1>Aggregations</h1><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:25
//line lib/streamaggr/state.qtpl:27
for rwKey, _ := range rws {
//line lib/streamaggr/state.qtpl:25
//line lib/streamaggr/state.qtpl:27
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:29
if rwKey == rwActive {
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:29
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:29
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:29
}
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:29
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:28
//line lib/streamaggr/state.qtpl:30
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:28
//line lib/streamaggr/state.qtpl:30
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:31
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:31
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:32
//line lib/streamaggr/state.qtpl:34
}
//line lib/streamaggr/state.qtpl:32
//line lib/streamaggr/state.qtpl:34
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregations" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm"><thead><tr><th scope="col" style="width: 5%">Num</th><th scope="col" style="width: 35%">Match</th><th scope="col" style="width: 10%">By</th><th scope="col" style="width: 10%">Without</a><th scope="col" style="width: 40%">Outputs</a></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:48
//line lib/streamaggr/state.qtpl:50
aggs := rws[rwActive]
//line lib/streamaggr/state.qtpl:49
//line lib/streamaggr/state.qtpl:51
for an, agg := range aggs.as {
//line lib/streamaggr/state.qtpl:49
//line lib/streamaggr/state.qtpl:51
qw422016.N().S(`<tr><td>`)
//line lib/streamaggr/state.qtpl:51
//line lib/streamaggr/state.qtpl:53
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:51
//line lib/streamaggr/state.qtpl:53
qw422016.N().S(`</td><td><code>`)
//line lib/streamaggr/state.qtpl:53
//line lib/streamaggr/state.qtpl:55
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:53
//line lib/streamaggr/state.qtpl:55
qw422016.N().S(`</code></td><td class="labels">`)
//line lib/streamaggr/state.qtpl:56
//line lib/streamaggr/state.qtpl:58
for abn, ab := range agg.by {
//line lib/streamaggr/state.qtpl:57
//line lib/streamaggr/state.qtpl:59
if abn > 0 {
//line lib/streamaggr/state.qtpl:57
//line lib/streamaggr/state.qtpl:59
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:59
}
//line lib/streamaggr/state.qtpl:59
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:61
}
//line lib/streamaggr/state.qtpl:61
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:63
qw422016.E().S(ab)
//line lib/streamaggr/state.qtpl:61
//line lib/streamaggr/state.qtpl:63
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:63
//line lib/streamaggr/state.qtpl:65
}
//line lib/streamaggr/state.qtpl:63
//line lib/streamaggr/state.qtpl:65
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:66
//line lib/streamaggr/state.qtpl:68
for awn, aw := range agg.without {
//line lib/streamaggr/state.qtpl:67
//line lib/streamaggr/state.qtpl:69
if awn > 0 {
//line lib/streamaggr/state.qtpl:67
//line lib/streamaggr/state.qtpl:69
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:69
//line lib/streamaggr/state.qtpl:71
}
//line lib/streamaggr/state.qtpl:69
//line lib/streamaggr/state.qtpl:71
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:71
//line lib/streamaggr/state.qtpl:73
qw422016.E().S(aw)
//line lib/streamaggr/state.qtpl:71
//line lib/streamaggr/state.qtpl:73
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:73
//line lib/streamaggr/state.qtpl:75
}
//line lib/streamaggr/state.qtpl:73
//line lib/streamaggr/state.qtpl:75
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:76
//line lib/streamaggr/state.qtpl:78
for asn, as := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:77
//line lib/streamaggr/state.qtpl:79
if asn > 0 {
//line lib/streamaggr/state.qtpl:77
//line lib/streamaggr/state.qtpl:79
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:79
//line lib/streamaggr/state.qtpl:81
}
//line lib/streamaggr/state.qtpl:79
//line lib/streamaggr/state.qtpl:81
qw422016.N().S(`<a href="?rw=`)
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:80
//line lib/streamaggr/state.qtpl:82
qw422016.N().S(`">`)
//line lib/streamaggr/state.qtpl:81
//line lib/streamaggr/state.qtpl:83
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:81
//line lib/streamaggr/state.qtpl:83
qw422016.N().S(`</a>`)
//line lib/streamaggr/state.qtpl:83
//line lib/streamaggr/state.qtpl:85
}
//line lib/streamaggr/state.qtpl:83
//line lib/streamaggr/state.qtpl:85
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:86
//line lib/streamaggr/state.qtpl:88
}
//line lib/streamaggr/state.qtpl:86
//line lib/streamaggr/state.qtpl:88
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
}
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
func WriteStreamAggHTML(qq422016 qtio422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
StreamStreamAggHTML(qw422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
}
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
func StreamAggHTML(rws map[string]*Aggregators, rwActive string) string {
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
WriteStreamAggHTML(qb422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
return qs422016
//line lib/streamaggr/state.qtpl:97
//line lib/streamaggr/state.qtpl:99
}
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:101
func StreamStreamAggOutputStateHTML(qw422016 *qt422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) {
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:101
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:103
//line lib/streamaggr/state.qtpl:105
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:103
//line lib/streamaggr/state.qtpl:105
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:107
//line lib/streamaggr/state.qtpl:109
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:107
//line lib/streamaggr/state.qtpl:109
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12"><h1>Aggregation state</h1><h4> [ <a href="?rw=`)
//line lib/streamaggr/state.qtpl:112
//line lib/streamaggr/state.qtpl:114
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:112
//line lib/streamaggr/state.qtpl:114
qw422016.N().S(`">back to aggregations</a> ] </h3><hr /><h6><div class="row"><div class="col-xxl-1">Remote write:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:117
//line lib/streamaggr/state.qtpl:119
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:117
//line lib/streamaggr/state.qtpl:119
qw422016.N().S(`</code><div class="w-100"></div><div class="col-xxl-1">Aggregation num:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:121
//line lib/streamaggr/state.qtpl:123
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:121
//line lib/streamaggr/state.qtpl:123
qw422016.N().S(`</code><div class="w-100"></div><div class="col-xxl-1">Match:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:125
//line lib/streamaggr/state.qtpl:127
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:125
//line lib/streamaggr/state.qtpl:127
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:128
//line lib/streamaggr/state.qtpl:130
if len(agg.by) > 0 {
//line lib/streamaggr/state.qtpl:128
//line lib/streamaggr/state.qtpl:130
qw422016.N().S(`<div class="col-xxl-1">By:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:130
qw422016.E().S(strings.Join(agg.by, ", "))
//line lib/streamaggr/state.qtpl:130
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:132
}
//line lib/streamaggr/state.qtpl:133
if len(agg.without) > 0 {
//line lib/streamaggr/state.qtpl:133
qw422016.N().S(`<div class="col-xxl-1">Without:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:135
qw422016.E().S(strings.Join(agg.without, ", "))
//line lib/streamaggr/state.qtpl:135
qw422016.E().S(strings.Join(agg.by, ", "))
//line lib/streamaggr/state.qtpl:132
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:137
//line lib/streamaggr/state.qtpl:134
}
//line lib/streamaggr/state.qtpl:135
if len(agg.without) > 0 {
//line lib/streamaggr/state.qtpl:135
qw422016.N().S(`<div class="col-xxl-1">Without:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:137
qw422016.E().S(strings.Join(agg.without, ", "))
//line lib/streamaggr/state.qtpl:137
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:139
}
//line lib/streamaggr/state.qtpl:139
qw422016.N().S(`</div></h6><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:142
//line lib/streamaggr/state.qtpl:144
for _, a := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:142
//line lib/streamaggr/state.qtpl:144
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:144
//line lib/streamaggr/state.qtpl:146
if a.getOutputName() == as.getOutputName() {
//line lib/streamaggr/state.qtpl:144
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:144
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:144
//line lib/streamaggr/state.qtpl:146
}
//line lib/streamaggr/state.qtpl:144
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:145
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:146
//line lib/streamaggr/state.qtpl:148
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:146
//line lib/streamaggr/state.qtpl:148
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:149
//line lib/streamaggr/state.qtpl:151
}
//line lib/streamaggr/state.qtpl:149
//line lib/streamaggr/state.qtpl:151
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregation-state" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm">`)
//line lib/streamaggr/state.qtpl:156
//line lib/streamaggr/state.qtpl:158
sr := as.getStateRepresentation(agg.suffix)
sort.Slice(sr, func(i, j int) bool {
return sr[i].metric < sr[j].metric
@ -270,49 +272,77 @@ func StreamStreamAggOutputStateHTML(qw422016 *qt422016.Writer, rwActive string,
sr = sr[:limit]
}
//line lib/streamaggr/state.qtpl:163
qw422016.N().S(`<thead><tr><th scope="col">Metric</th><th scope="col">Value</th></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:171
//line lib/streamaggr/state.qtpl:165
qw422016.N().S(`<thead><tr><th scope="col">Metric</th><th scope="col">Current value</th><th scope="col">Samples count</th><th scope="col">Last push time</th><th scope="col">Next push time</th></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:176
for _, asr := range sr {
//line lib/streamaggr/state.qtpl:171
//line lib/streamaggr/state.qtpl:176
qw422016.N().S(`<tr><td><code>`)
//line lib/streamaggr/state.qtpl:174
//line lib/streamaggr/state.qtpl:179
qw422016.E().S(asr.metric)
//line lib/streamaggr/state.qtpl:174
//line lib/streamaggr/state.qtpl:179
qw422016.N().S(`</code></td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:177
qw422016.N().F(asr.value)
//line lib/streamaggr/state.qtpl:177
//line lib/streamaggr/state.qtpl:182
qw422016.N().F(asr.currentValue)
//line lib/streamaggr/state.qtpl:182
qw422016.N().S(`</td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:185
qw422016.E().S(fmt.Sprintf("%v", asr.samplesCount))
//line lib/streamaggr/state.qtpl:185
qw422016.N().S(`</td><td>`)
//line lib/streamaggr/state.qtpl:188
if asr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:189
qw422016.E().S(time.Unix(int64(agg.initialTime), 0).String())
//line lib/streamaggr/state.qtpl:190
} else {
//line lib/streamaggr/state.qtpl:191
qw422016.E().S(time.Unix(int64(asr.lastPushTimestamp), 0).String())
//line lib/streamaggr/state.qtpl:192
}
//line lib/streamaggr/state.qtpl:192
qw422016.N().S(`</td><td>`)
//line lib/streamaggr/state.qtpl:195
if asr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:196
qw422016.E().S(time.Unix(int64(asr.nextPushTimestamp+agg.initialTime), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:197
} else {
//line lib/streamaggr/state.qtpl:198
qw422016.E().S(time.Unix(int64(asr.nextPushTimestamp), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:199
}
//line lib/streamaggr/state.qtpl:199
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:180
//line lib/streamaggr/state.qtpl:202
}
//line lib/streamaggr/state.qtpl:180
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
}
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
func WriteStreamAggOutputStateHTML(qq422016 qtio422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) {
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
StreamStreamAggOutputStateHTML(qw422016, rwActive, aggNum, agg, as, limit)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
}
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) string {
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
WriteStreamAggOutputStateHTML(qb422016, rwActive, aggNum, agg, as, limit)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
return qs422016
//line lib/streamaggr/state.qtpl:191
//line lib/streamaggr/state.qtpl:213
}

View file

@ -3,28 +3,39 @@ package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stddevAggrState calculates output=stddev, e.g. the average value over input samples.
type stddevAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type stddevStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
deleteDeadline uint64
}
func newStddevAggrState() *stddevAggrState {
return &stddevAggrState{}
func newStddevAggrState(interval time.Duration, stalenessInterval time.Duration) *stddevAggrState {
return &stddevAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *stddevAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -45,6 +56,7 @@ again:
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -54,23 +66,50 @@ again:
}
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stddevAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*stddevStateValue)
sv.mu.Lock()
stddev := math.Sqrt(sv.q / sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
var stddev float64
if sv.count > 0 {
stddev = math.Sqrt(sv.q / sv.count)
}
sv.count = 0
sv.q = 0
sv.avg = 0
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stddev)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *stddevAggrState) getOutputName() string {
@ -87,8 +126,11 @@ func (as *stddevAggrState) getStateRepresentation(suffix string) []aggrStateRepr
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: math.Sqrt(value.q / value.count),
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: math.Sqrt(value.q / value.count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: uint64(value.count),
})
return true
})

View file

@ -2,28 +2,39 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// stdvarAggrState calculates output=stdvar, e.g. the average value over input samples.
type stdvarAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type stdvarStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
deleteDeadline uint64
}
func newStdvarAggrState() *stdvarAggrState {
return &stdvarAggrState{}
func newStdvarAggrState(interval time.Duration, stalenessInterval time.Duration) *stdvarAggrState {
return &stdvarAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -44,6 +55,7 @@ again:
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -53,23 +65,49 @@ again:
}
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *stdvarAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*stdvarStateValue)
sv.mu.Lock()
stdvar := sv.q / sv.count
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
var stdvar float64
if sv.count > 0 {
stdvar = sv.q / sv.count
}
sv.q = 0
sv.avg = 0
sv.count = 0
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stdvar)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *stdvarAggrState) getOutputName() string {
@ -86,8 +124,11 @@ func (as *stdvarAggrState) getStateRepresentation(suffix string) []aggrStateRepr
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.q / value.count,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.q / value.count,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: uint64(value.count),
})
return true
})

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"encoding/json"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"math"
"sort"
"strconv"
@ -23,12 +24,17 @@ import (
var supportedOutputs = []string{
"total",
"total_pure",
"newtotal",
"newtotal_pure",
"increase",
"increase_pure",
"newincrease",
"newincrease_pure",
"count_series",
"count_samples",
"sum_samples",
"sum_samples_total",
"last",
"min",
"max",
@ -244,8 +250,9 @@ type aggregator struct {
// for `interval: 1m`, `by: [job]`
suffix string
wg sync.WaitGroup
stopCh chan struct{}
initialTime uint64
wg sync.WaitGroup
stopCh chan struct{}
}
type aggrState interface {
@ -256,8 +263,11 @@ type aggrState interface {
}
type aggrStateRepresentation struct {
metric string
value float64
metric string
lastPushTimestamp uint64
nextPushTimestamp uint64
currentValue float64
samplesCount uint64
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -340,38 +350,48 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(phis)
aggrStates[i] = newQuantilesAggrState(interval, stalenessInterval, phis)
continue
}
switch output {
case "total":
aggrStates[i] = newTotalAggrState(interval, stalenessInterval)
case "total_pure":
aggrStates[i] = newTotalPureAggrState(interval, stalenessInterval)
case "newtotal":
aggrStates[i] = newnewtotalAggrState(interval, stalenessInterval)
case "newtotal_pure":
aggrStates[i] = newnewotalPureAggrState(interval, stalenessInterval)
case "increase":
aggrStates[i] = newIncreaseAggrState(interval, stalenessInterval)
case "increase_pure":
aggrStates[i] = newIncreasePureAggrState(interval, stalenessInterval)
case "newincrease":
aggrStates[i] = newnewincreaseAggrState(interval, stalenessInterval)
case "newincrease_pure":
aggrStates[i] = newnewincreasePureAggrState(interval, stalenessInterval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState()
aggrStates[i] = newCountSeriesAggrState(interval, stalenessInterval)
case "count_samples":
aggrStates[i] = newCountSamplesAggrState()
aggrStates[i] = newCountSamplesAggrState(interval, stalenessInterval)
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState()
aggrStates[i] = newSumSamplesAggrState(interval, stalenessInterval)
case "sum_samples_total":
aggrStates[i] = newSumSamplesTotalAggrState(interval, stalenessInterval)
case "last":
aggrStates[i] = newLastAggrState()
aggrStates[i] = newLastAggrState(interval, stalenessInterval)
case "min":
aggrStates[i] = newMinAggrState()
aggrStates[i] = newMinAggrState(interval, stalenessInterval)
case "max":
aggrStates[i] = newMaxAggrState()
aggrStates[i] = newMaxAggrState(interval, stalenessInterval)
case "avg":
aggrStates[i] = newAvgAggrState()
aggrStates[i] = newAvgAggrState(interval, stalenessInterval)
case "stddev":
aggrStates[i] = newStddevAggrState()
aggrStates[i] = newStddevAggrState(interval, stalenessInterval)
case "stdvar":
aggrStates[i] = newStdvarAggrState()
aggrStates[i] = newStdvarAggrState(interval, stalenessInterval)
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(stalenessInterval)
aggrStates[i] = newHistogramBucketAggrState(interval, stalenessInterval)
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; "+
"see https://docs.victoriametrics.com/vmagent.html#stream-aggregation", output, supportedOutputs)
@ -390,7 +410,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState()
dedupAggr = newLastAggrState(interval, stalenessInterval)
}
// initialize the aggregator
@ -410,7 +430,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
suffix: suffix,
stopCh: make(chan struct{}),
initialTime: fasttime.UnixTimestamp(),
stopCh: make(chan struct{}),
}
if dedupAggr != nil {

View file

@ -2,26 +2,38 @@ package streamaggr
import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesAggrState calculates output=sum_samples, e.g. the sum over input samples.
type sumSamplesAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type sumSamplesStateValue struct {
mu sync.Mutex
sum float64
deleted bool
mu sync.Mutex
sum float64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newSumSamplesAggrState() *sumSamplesAggrState {
return &sumSamplesAggrState{}
func newSumSamplesAggrState(interval time.Duration, stalenessInterval time.Duration) *sumSamplesAggrState {
return &sumSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -42,6 +54,8 @@ again:
deleted := sv.deleted
if !deleted {
sv.sum += value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -51,23 +65,44 @@ again:
}
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTimeMsec := int64(fasttime.UnixTimestamp()) * 1000
func (as *sumSamplesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.sum = 0
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, sum)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *sumSamplesAggrState) getOutputName() string {
@ -84,8 +119,11 @@ func (as *sumSamplesAggrState) getStateRepresentation(suffix string) []aggrState
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.sum,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

View file

@ -0,0 +1,135 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// sumSamplesTotalAggrState calculates output=sum_samples, e.g. the sum over input samples.
type sumSamplesTotalAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type sumSamplesTotalStateValue struct {
mu sync.Mutex
sum float64
samplesCount uint64
deleted bool
deleteDeadline uint64
}
func newSumSamplesTotalAggrState(interval time.Duration, stalenessInterval time.Duration) *sumSamplesTotalAggrState {
return &sumSamplesTotalAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *sumSamplesTotalAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &sumSamplesTotalStateValue{
sum: value,
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if !loaded {
// The new entry has been successfully created.
return
}
// Use the entry created by a concurrent goroutine.
v = vNew
}
sv := v.(*sumSamplesTotalStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
sv.sum += value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *sumSamplesTotalAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*sumSamplesTotalStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *sumSamplesTotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*sumSamplesTotalStateValue)
sv.mu.Lock()
sum := sv.sum
if math.Abs(sv.sum) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.sum = 0
}
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, sum)
return true
})
as.lastPushTimestamp = currentTime
}
func (as *sumSamplesTotalAggrState) getOutputName() string {
return "sum_samples_total"
}
func (as *sumSamplesTotalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*sumSamplesTotalStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

View file

@ -10,16 +10,18 @@ import (
// totalAggrState calculates output=total, e.g. the summary counter over input counters.
type totalAggrState struct {
m sync.Map
m sync.Map
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type totalStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
@ -36,8 +38,9 @@ func newTotalAggrState(interval time.Duration, stalenessInterval time.Duration)
intervalSecs := roundDurationToSecs(interval)
stalenessSecs := roundDurationToSecs(stalenessInterval)
return &totalAggrState{
ignoreInputDeadline: currentTime + intervalSecs,
intervalSecs: intervalSecs,
stalenessSecs: stalenessSecs,
ignoreInputDeadline: currentTime + intervalSecs,
}
}
@ -77,6 +80,7 @@ again:
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
@ -137,6 +141,7 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *totalAggrState) getOutputName() string {
@ -153,8 +158,11 @@ func (as *totalAggrState) getStateRepresentation(suffix string) []aggrStateRepre
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
value: value.total,
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})

156
lib/streamaggr/totalpure.go Normal file
View file

@ -0,0 +1,156 @@
package streamaggr
import (
"math"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
// totalPureAggrState calculates output=total_pure, e.g. the summary counter over input counters.
type totalPureAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
}
type totalPureStateValue struct {
mu sync.Mutex
lastValues map[string]*lastValueState
total float64
samplesCount uint64
deleteDeadline uint64
deleted bool
}
func newTotalPureAggrState(interval time.Duration, stalenessInterval time.Duration) *totalPureAggrState {
return &totalPureAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
}
}
func (as *totalPureAggrState) pushSample(inputKey, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
// The entry is missing in the map. Try creating it.
v = &totalPureStateValue{
lastValues: make(map[string]*lastValueState),
}
vNew, loaded := as.m.LoadOrStore(outputKey, v)
if loaded {
// Use the entry created by a concurrent goroutine.
v = vNew
}
}
sv := v.(*totalPureStateValue)
sv.mu.Lock()
deleted := sv.deleted
if !deleted {
lv, ok := sv.lastValues[inputKey]
if !ok {
lv = &lastValueState{}
sv.lastValues[inputKey] = lv
}
d := value
if ok && lv.value <= value {
d = value - lv.value
}
sv.total += d
lv.value = value
lv.deleteDeadline = deleteDeadline
sv.deleteDeadline = deleteDeadline
sv.samplesCount++
}
sv.mu.Unlock()
if deleted {
// The entry has been deleted by the concurrent call to appendSeriesForFlush
// Try obtaining and updating the entry again.
goto again
}
}
func (as *totalPureAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalPureStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
} else {
// Delete outdated entries in sv.lastValues
m := sv.lastValues
for k1, v1 := range m {
if currentTime > v1.deleteDeadline {
delete(m, k1)
}
}
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *totalPureAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*totalPureStateValue)
sv.mu.Lock()
totalPure := sv.total
if math.Abs(sv.total) >= (1 << 53) {
// It is time to reset the entry, since it starts losing float64 precision
sv.total = 0
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, totalPure)
}
return true
})
as.lastPushTimestamp = currentTime
}
func (as *totalPureAggrState) getOutputName() string {
return "total_pure"
}
func (as *totalPureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
as.m.Range(func(k, v any) bool {
value := v.(*totalPureStateValue)
value.mu.Lock()
defer value.mu.Unlock()
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
})
return true
})
return result
}

19
sa.yaml
View file

@ -1,14 +1,14 @@
- match:
- '{__name__=~".*total"}'
- '{__name__=~".*entries"}'
- '{__name__=~".*count"}'
interval: 1m
by: [instance]
- match: 'extension.error'
interval: 5m
staleness_interval: 24h
without: [instance]
outputs:
- total
- newtotal
- increase
- newtotal
- newincrease
- total_pure
- increase_pure
- avg
- count_samples
- count_series
@ -20,7 +20,4 @@
- stddev
- stdvar
- sum_samples
- match: 'extension.error'
interval: 1h
without: [instance]
outputs: [total, increase, newtotal, newincrease]
- sum_samples_total