Prepare for PR - refactoring

This commit is contained in:
Alexander Marshalov 2023-12-11 19:53:28 +01:00
parent 8725b5e049
commit 715bf3af82
No known key found for this signature in database
22 changed files with 755 additions and 842 deletions

View file

@ -631,6 +631,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# (it's like this series didn't exist until now).
# Increase this parameter if it is expected for matched metrics to be delayed or collected with irregular intervals exceeding the `interval` value.
# By default, is equal to x2 of the `interval` field.
# The parameter is only relevant for outputs: total, increase and histogram_bucket.
#
# staleness_interval: 2m

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,29 +12,23 @@ import (
type avgAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type avgStateValue struct {
mu sync.Mutex
sum float64
count uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
sum float64
count uint64
deleted bool
}
func newAvgAggrState(interval time.Duration, stalenessInterval time.Duration) *avgAggrState {
func newAvgAggrState(interval time.Duration) *avgAggrState {
return &avgAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *avgAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -56,7 +51,6 @@ again:
if !deleted {
sv.sum += value
sv.count++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -66,56 +60,37 @@ again:
}
}
func (as *avgAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*avgStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *avgAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*avgStateValue)
sv.mu.Lock()
var avg float64
if sv.count > 0 {
avg = sv.sum / float64(sv.count)
}
sv.sum = 0
sv.count = 0
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, avg)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *avgAggrState) getOutputName() string {
return "avg"
}
func (as *avgAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *avgAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*avgStateValue)
value.mu.Lock()
@ -123,14 +98,16 @@ func (as *avgAggrState) getStateRepresentation(suffix string) []aggrStateReprese
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum / float64(value.count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.count,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum / float64(value.count),
samplesCount: value.count,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -12,27 +13,22 @@ type countSamplesAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type countSamplesStateValue struct {
mu sync.Mutex
n uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
n uint64
deleted bool
}
func newCountSamplesAggrState(interval time.Duration, stalenessInterval time.Duration) *countSamplesAggrState {
func newCountSamplesAggrState(interval time.Duration) *countSamplesAggrState {
return &countSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *countSamplesAggrState) pushSample(_, outputKey string, _ float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -53,7 +49,6 @@ again:
deleted := sv.deleted
if !deleted {
sv.n++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -63,52 +58,33 @@ again:
}
}
func (as *countSamplesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *countSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSamplesStateValue)
sv.mu.Lock()
n := sv.n
sv.n = 0
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *countSamplesAggrState) getOutputName() string {
return "count_samples"
}
func (as *countSamplesAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *countSamplesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSamplesStateValue)
value.mu.Lock()
@ -116,14 +92,16 @@ func (as *countSamplesAggrState) getStateRepresentation(suffix string) []aggrSta
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.n,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
samplesCount: value.n,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -12,7 +13,7 @@ type countSamplesTotalAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type countSamplesTotalStateValue struct {
@ -99,15 +100,15 @@ func (as *countSamplesTotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *countSamplesTotalAggrState) getOutputName() string {
return "count_samples_total"
}
func (as *countSamplesTotalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *countSamplesTotalAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSamplesTotalStateValue)
value.mu.Lock()
@ -115,14 +116,16 @@ func (as *countSamplesTotalAggrState) getStateRepresentation(suffix string) []ag
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.n,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
samplesCount: value.n,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,30 +12,24 @@ import (
type countSeriesAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type countSeriesStateValue struct {
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
countedSeries map[string]struct{}
n uint64
samplesCount uint64
deleted bool
}
func newCountSeriesAggrState(interval time.Duration, stalenessInterval time.Duration) *countSeriesAggrState {
func newCountSeriesAggrState(interval time.Duration) *countSeriesAggrState {
return &countSeriesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *countSeriesAggrState) pushSample(inputKey, outputKey string, _ float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -60,7 +55,6 @@ again:
if _, ok := sv.countedSeries[inputKey]; !ok {
sv.countedSeries[inputKey] = struct{}{}
sv.n++
sv.deleteDeadline = deleteDeadline
}
sv.samplesCount++
}
@ -72,57 +66,35 @@ again:
}
}
func (as *countSeriesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *countSeriesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*countSeriesStateValue)
sv.mu.Lock()
n := sv.n
sv.n = 0
// todo: use builtin function clear after switching to go 1.21
for csk := range sv.countedSeries {
delete(sv.countedSeries, csk)
}
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, float64(n))
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *countSeriesAggrState) getOutputName() string {
return "count_series"
}
func (as *countSeriesAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *countSeriesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*countSeriesStateValue)
value.mu.Lock()
@ -130,14 +102,16 @@ func (as *countSeriesAggrState) getStateRepresentation(suffix string) []aggrStat
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: float64(value.n),
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -1,12 +1,13 @@
package streamaggr
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
)
@ -15,7 +16,7 @@ type histogramBucketAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type histogramBucketStateValue struct {
@ -104,15 +105,15 @@ func (as *histogramBucketAggrState) appendSeriesForFlush(ctx *flushCtx) {
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *histogramBucketAggrState) getOutputName() string {
return "count_series"
return "histogram_bucket"
}
func (as *histogramBucketAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *histogramBucketAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
rmetrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*histogramBucketStateValue)
value.mu.Lock()
@ -121,20 +122,22 @@ func (as *histogramBucketAggrState) getStateRepresentation(suffix string) []aggr
return true
}
value.h.VisitNonZeroBuckets(func(vmrange string, count uint64) {
result = append(result, aggrStateRepresentation{
rmetrics = append(rmetrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName(), prompbmarshal.Label{
Name: vmrange,
Name: "vmrange",
Value: vmrange,
}),
currentValue: float64(count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
currentValue: float64(count),
samplesCount: value.samplesCount,
})
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: rmetrics,
}
}
func roundDurationToSecs(d time.Duration) uint64 {

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -13,7 +14,7 @@ type increaseAggrState struct {
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type increaseStateValue struct {
@ -131,15 +132,15 @@ func (as *increaseAggrState) appendSeriesForFlush(ctx *flushCtx) {
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *increaseAggrState) getOutputName() string {
return "increase"
}
func (as *increaseAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *increaseAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*increaseStateValue)
value.mu.Lock()
@ -147,14 +148,16 @@ func (as *increaseAggrState) getStateRepresentation(suffix string) []aggrStateRe
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -12,7 +13,7 @@ type increasePureAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type increasePureStateValue struct {
@ -124,15 +125,15 @@ func (as *increasePureAggrState) appendSeriesForFlush(ctx *flushCtx) {
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *increasePureAggrState) getOutputName() string {
return "increase_pure"
}
func (as *increasePureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *increasePureAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*increasePureStateValue)
value.mu.Lock()
@ -140,14 +141,16 @@ func (as *increasePureAggrState) getStateRepresentation(suffix string) []aggrSta
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,29 +12,23 @@ import (
type lastAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type lastStateValue struct {
mu sync.Mutex
last float64
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
last float64
samplesCount uint64
deleted bool
}
func newLastAggrState(interval time.Duration, stalenessInterval time.Duration) *lastAggrState {
func newLastAggrState(interval time.Duration) *lastAggrState {
return &lastAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *lastAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -55,7 +50,6 @@ again:
if !deleted {
sv.last = value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -65,51 +59,33 @@ again:
}
}
func (as *lastAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*lastStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *lastAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*lastStateValue)
sv.mu.Lock()
last := sv.last
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, last)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *lastAggrState) getOutputName() string {
return "last"
}
func (as *lastAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *lastAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*lastStateValue)
value.mu.Lock()
@ -117,14 +93,16 @@ func (as *lastAggrState) getStateRepresentation(suffix string) []aggrStateRepres
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.last,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.last,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,29 +12,23 @@ import (
type maxAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type maxStateValue struct {
mu sync.Mutex
max float64
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
max float64
samplesCount uint64
deleted bool
}
func newMaxAggrState(interval time.Duration, stalenessInterval time.Duration) *maxAggrState {
func newMaxAggrState(interval time.Duration) *maxAggrState {
return &maxAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *maxAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -57,7 +52,6 @@ again:
sv.max = value
}
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -67,52 +61,34 @@ again:
}
}
func (as *maxAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*maxStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *maxAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*maxStateValue)
sv.mu.Lock()
value := sv.max
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, value)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *maxAggrState) getOutputName() string {
return "max"
}
func (as *maxAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *maxAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*maxStateValue)
value.mu.Lock()
@ -120,14 +96,16 @@ func (as *maxAggrState) getStateRepresentation(suffix string) []aggrStateReprese
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.max,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.max,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,29 +12,23 @@ import (
type minAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type minStateValue struct {
mu sync.Mutex
min float64
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
min float64
samplesCount uint64
deleted bool
}
func newMinAggrState(interval time.Duration, stalenessInterval time.Duration) *minAggrState {
func newMinAggrState(interval time.Duration) *minAggrState {
return &minAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *minAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -56,7 +51,6 @@ again:
if value < sv.min {
sv.min = value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
}
sv.mu.Unlock()
@ -67,51 +61,33 @@ again:
}
}
func (as *minAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*minStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *minAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*minStateValue)
sv.mu.Lock()
m := sv.min
value := sv.min
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, m)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, value)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *minAggrState) getOutputName() string {
return "min"
}
func (as *minAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *minAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*minStateValue)
value.mu.Lock()
@ -119,14 +95,16 @@ func (as *minAggrState) getStateRepresentation(suffix string) []aggrStateReprese
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.min,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.min,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -4,6 +4,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -16,30 +17,24 @@ type quantilesAggrState struct {
m sync.Map
phis []float64
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type quantilesStateValue struct {
mu sync.Mutex
h *histogram.Fast
samplesCount uint64
deleted bool
deleteDeadline uint64
mu sync.Mutex
h *histogram.Fast
samplesCount uint64
deleted bool
}
func newQuantilesAggrState(interval time.Duration, stalenessInterval time.Duration, phis []float64) *quantilesAggrState {
func newQuantilesAggrState(interval time.Duration, phis []float64) *quantilesAggrState {
return &quantilesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
phis: phis,
intervalSecs: roundDurationToSecs(interval),
phis: phis,
}
}
func (as *quantilesAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -61,7 +56,6 @@ again:
if !deleted {
sv.h.Update(value)
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -71,42 +65,24 @@ again:
}
}
func (as *quantilesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*quantilesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
histogram.PutFast(sv.h)
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
phis := as.phis
var quantiles []float64
var b []byte
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*quantilesStateValue)
sv.mu.Lock()
quantiles = sv.h.Quantiles(quantiles[:0], phis)
sv.h.Reset()
histogram.PutFast(sv.h)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
@ -117,15 +93,15 @@ func (as *quantilesAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *quantilesAggrState) getOutputName() string {
return "quantiles"
}
func (as *quantilesAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *quantilesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
var b []byte
as.m.Range(func(k, v any) bool {
value := v.(*quantilesStateValue)
@ -136,18 +112,20 @@ func (as *quantilesAggrState) getStateRepresentation(suffix string) []aggrStateR
}
for i, quantile := range value.h.Quantiles(make([]float64, 0), as.phis) {
b = strconv.AppendFloat(b[:0], as.phis[i], 'g', -1, 64)
result = append(result, aggrStateRepresentation{
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName(), prompbmarshal.Label{
Name: "quantile",
Value: bytesutil.InternBytes(b),
}),
currentValue: quantile,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
currentValue: quantile,
samplesCount: value.samplesCount,
})
}
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"fmt"
"net/http"
"net/url"
"strconv"
)
@ -70,5 +71,11 @@ func WriteHumanReadableState(w http.ResponseWriter, r *http.Request, rws map[str
}
}
WriteStreamAggOutputStateHTML(w, rwActive, aggNum, agg, as, limitNum)
filter, err := url.QueryUnescape(r.FormValue("filter"))
if err != nil {
_, _ = fmt.Fprintf(w, "incorrect parameter 'filter': %v", err)
w.WriteHeader(http.StatusBadRequest)
return
}
WriteStreamAggOutputStateHTML(w, rwActive, aggNum, agg, as, limitNum, filter)
}

View file

@ -7,6 +7,10 @@
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
) %}
{% code
const DEFAULT_LIMIT = 1000
%}
{% stripspace %}
{% func StreamAggHTML(rws map[string]*Aggregators, rwActive string) %}
@ -79,7 +83,7 @@
{% if asn > 0 %}
<span>, </span>
{% endif %}
<a href="?rw={%s rwActive %}&agg={%d an %}&output={%s as.getOutputName() %}">
<a href="?rw={%s rwActive %}&agg={%d an %}&output={%s as.getOutputName() %}&limit={%d DEFAULT_LIMIT %}">
{%s as.getOutputName() %}
</a>
{% endfor %}
@ -98,7 +102,7 @@
</html>
{% endfunc %}
{% func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) %}
{% func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) %}
<!DOCTYPE html>
<html lang="en">
<head>
@ -110,33 +114,85 @@
<div class="container-fluid">
<div class="row">
<main class="col-12">
{% code
sr := as.getStateRepresentation(agg.suffix)
if filter != "" {
filter = strings.ToLower(filter)
metrics := sr.metrics[:0]
for _, m := range sr.metrics {
if strings.Contains(strings.ToLower(m.metric), filter) {
metrics = append(metrics, m)
}
}
sr.metrics = metrics
}
sort.Slice(sr.metrics, func(i, j int) bool {
return sr.metrics[i].metric < sr.metrics[j].metric
})
if len(sr.metrics) > limit {
sr.metrics = sr.metrics[:limit]
}
%}
<h1>Aggregation state</h1>
<h4> [ <a href="?rw={%s rwActive %}">back to aggregations</a> ] </h3>
<hr />
<h6>
<div class="row">
<div class="col-xxl-1">Remote write:</div>
<code class="col w-100">{%s rwActive %}</code>
<div class="w-100"></div>
<div class="row container-sm">
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="remote-write" style="width: 200px">Remote write:</span>
<input type="text" class="form-control" aria-label="Remote write" aria-describedby="remote-write" value="{%s rwActive %}" readonly />
</div>
<div class="col-xxl-1">Aggregation num:</div>
<code class="col w-100">{%d aggNum %}</code>
<div class="w-100"></div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="agg-num" style="width: 200px">Aggregation num:</span>
<input type="number" class="form-control" aria-label="Aggregation num" aria-describedby="agg-num" value="{%d aggNum %}" readonly />
</div>
<div class="col-xxl-1">Match:</div>
<code class="col w-100">{%s agg.match.String() %}</code>
<div class="w-100"></div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="match" style="width: 200px">Match:</span>
<input type="string" class="form-control" aria-label="Match" aria-describedby="match" value="{%s agg.match.String() %}" readonly />
</div>
{% if len(agg.by) > 0 %}
<div class="col-xxl-1">By:</div>
<code class="col w-100">{%s strings.Join(agg.by, ", ") %}</code>
<div class="w-100"></div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="by" style="width: 200px">By:</span>
<input type="string" class="form-control" aria-label="By" aria-describedby="by" value="{%s strings.Join(agg.by, ", ") %}" readonly />
</div>
{% endif %}
{% if len(agg.without) > 0 %}
<div class="col-xxl-1">Without:</div>
<code class="col w-100">{%s strings.Join(agg.without, ", ") %}</code>
<div class="w-100"></div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="without" style="width: 200px">Without:</span>
<input type="string" class="form-control" aria-label="Without" aria-describedby="without" value="{%s strings.Join(agg.without, ", ") %}" readonly />
</div>
{% endif %}
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="interval" style="width: 200px">Interval (seconds):</span>
<input type="number" class="form-control" aria-label="Interval (seconds)" aria-describedby="interval" value="{%v sr.intervalSecs %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="last-push-time" style="width: 200px">Last push time:</span>
<input type="string" class="form-control" aria-label="Last push time" aria-describedby="last-push-time" value="{% if sr.lastPushTimestamp == 0 %}-{% else %}{%s time.Unix(int64(sr.lastPushTimestamp), 0).Format(time.RFC3339) %}{% endif %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="next-push-time" style="width: 200px">Next push time:</span>
<input type="string" class="form-control" aria-label="Next push time" aria-describedby="next-push-time" value="{% if sr.lastPushTimestamp == 0 %}{%s time.Unix(int64(agg.initialTime + sr.intervalSecs), 0).Format(time.RFC3339) %}{% else %}{%s time.Unix(int64(sr.lastPushTimestamp + sr.intervalSecs), 0).Format(time.RFC3339) %}{% endif %}" readonly />
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="limit-label" style="width: 200px">Items on the page:</span>
<input id="limit" type="number" class="form-control" aria-label="Limit" aria-describedby="limit-label" value="{%d limit %}" />
<button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s as.getOutputName() %}&limit='+document.querySelector(`#limit`).value+'&filter='+encodeURIComponent(document.querySelector(`#filter`).value)">apply</button>
</div>
<div class="input-group input-group-sm mb-1">
<span class="input-group-text" id="filter-label" style="width: 200px">Filter:</span>
<input id="filter" type="text" class="form-control" aria-label="Filter" aria-describedby="filter-label" value="{%s filter %}" />
<button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s as.getOutputName() %}&limit={%d limit %}&filter='+encodeURIComponent(document.querySelector(`#filter`).value)">apply</button>
</div>
</div>
</h6>
<hr />
@ -144,7 +200,7 @@
{% for _, a := range agg.aggrStates %}
<li class="nav-item" role="presentation">
<button class="nav-link{%if a.getOutputName()==as.getOutputName() %}{% space %}active{%endif%}" type="button" role="tab"
onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s a.getOutputName() %}'">
onclick="location.href='?rw={%s rwActive %}&agg={%d aggNum %}&output={%s a.getOutputName() %}&limit={%d limit %}'">
{%s a.getOutputName() %}
</button>
</li>
@ -154,26 +210,15 @@
<div class="tab-pane active" role="tabpanel">
<div id="aggregation-state" class="table-responsive">
<table class="table table-striped table-hover table-bordered table-sm">
{% code
sr := as.getStateRepresentation(agg.suffix)
sort.Slice(sr, func(i, j int) bool {
return sr[i].metric < sr[j].metric
})
if len(sr) > limit {
sr = sr[:limit]
}
%}
<thead>
<tr>
<th scope="col">Metric</th>
<th scope="col">Current value</th>
<th scope="col">Samples count</th>
<th scope="col">Last push time</th>
<th scope="col">Next push time</th>
</tr>
</thead>
<tbody>
{% for _, asr := range sr %}
{% for _, asr := range sr.metrics %}
<tr>
<td>
<code>{%s asr.metric %}</code>
@ -184,20 +229,6 @@
<td class="text-end">
{%s fmt.Sprintf("%v", asr.samplesCount) %}
</td>
<td>
{% if asr.lastPushTimestamp == 0 %}
{%s time.Unix(int64(agg.initialTime), 0).String() %}
{% else %}
{%s time.Unix(int64(asr.lastPushTimestamp), 0).String() %}
{% endif %}
</td>
<td>
{% if asr.lastPushTimestamp == 0 %}
{%s time.Unix(int64(asr.nextPushTimestamp + agg.initialTime), 0).Format(time.RFC3339) %}
{% else %}
{%s time.Unix(int64(asr.nextPushTimestamp), 0).Format(time.RFC3339) %}
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>

View file

@ -14,335 +14,420 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/htmlcomponents"
)
//line lib/streamaggr/state.qtpl:12
//line lib/streamaggr/state.qtpl:10
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line lib/streamaggr/state.qtpl:12
//line lib/streamaggr/state.qtpl:10
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line lib/streamaggr/state.qtpl:12
//line lib/streamaggr/state.qtpl:11
const DEFAULT_LIMIT = 1000
//line lib/streamaggr/state.qtpl:16
func StreamStreamAggHTML(qw422016 *qt422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:12
//line lib/streamaggr/state.qtpl:16
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:16
//line lib/streamaggr/state.qtpl:20
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:16
//line lib/streamaggr/state.qtpl:20
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:20
//line lib/streamaggr/state.qtpl:24
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:20
//line lib/streamaggr/state.qtpl:24
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12"><h1>Aggregations</h1><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:31
for rwKey, _ := range rws {
//line lib/streamaggr/state.qtpl:27
//line lib/streamaggr/state.qtpl:31
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:33
if rwKey == rwActive {
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:33
}
//line lib/streamaggr/state.qtpl:29
//line lib/streamaggr/state.qtpl:33
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:30
//line lib/streamaggr/state.qtpl:34
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:30
//line lib/streamaggr/state.qtpl:34
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:31
//line lib/streamaggr/state.qtpl:35
qw422016.E().S(rwKey)
//line lib/streamaggr/state.qtpl:31
//line lib/streamaggr/state.qtpl:35
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:34
//line lib/streamaggr/state.qtpl:38
}
//line lib/streamaggr/state.qtpl:34
//line lib/streamaggr/state.qtpl:38
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregations" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm"><thead><tr><th scope="col" style="width: 5%">Num</th><th scope="col" style="width: 35%">Match</th><th scope="col" style="width: 10%">By</th><th scope="col" style="width: 10%">Without</a><th scope="col" style="width: 40%">Outputs</a></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:50
//line lib/streamaggr/state.qtpl:54
aggs := rws[rwActive]
//line lib/streamaggr/state.qtpl:51
//line lib/streamaggr/state.qtpl:55
for an, agg := range aggs.as {
//line lib/streamaggr/state.qtpl:51
//line lib/streamaggr/state.qtpl:55
qw422016.N().S(`<tr><td>`)
//line lib/streamaggr/state.qtpl:53
//line lib/streamaggr/state.qtpl:57
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:53
//line lib/streamaggr/state.qtpl:57
qw422016.N().S(`</td><td><code>`)
//line lib/streamaggr/state.qtpl:55
//line lib/streamaggr/state.qtpl:59
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:55
//line lib/streamaggr/state.qtpl:59
qw422016.N().S(`</code></td><td class="labels">`)
//line lib/streamaggr/state.qtpl:58
//line lib/streamaggr/state.qtpl:62
for abn, ab := range agg.by {
//line lib/streamaggr/state.qtpl:59
//line lib/streamaggr/state.qtpl:63
if abn > 0 {
//line lib/streamaggr/state.qtpl:59
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:61
}
//line lib/streamaggr/state.qtpl:61
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:63
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:65
}
//line lib/streamaggr/state.qtpl:65
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:67
qw422016.E().S(ab)
//line lib/streamaggr/state.qtpl:63
//line lib/streamaggr/state.qtpl:67
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:65
//line lib/streamaggr/state.qtpl:69
}
//line lib/streamaggr/state.qtpl:65
//line lib/streamaggr/state.qtpl:69
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:68
//line lib/streamaggr/state.qtpl:72
for awn, aw := range agg.without {
//line lib/streamaggr/state.qtpl:69
//line lib/streamaggr/state.qtpl:73
if awn > 0 {
//line lib/streamaggr/state.qtpl:69
//line lib/streamaggr/state.qtpl:73
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:71
//line lib/streamaggr/state.qtpl:75
}
//line lib/streamaggr/state.qtpl:71
//line lib/streamaggr/state.qtpl:75
qw422016.N().S(`<span class="badge bg-secondary">`)
//line lib/streamaggr/state.qtpl:73
//line lib/streamaggr/state.qtpl:77
qw422016.E().S(aw)
//line lib/streamaggr/state.qtpl:73
//line lib/streamaggr/state.qtpl:77
qw422016.N().S(`</span>`)
//line lib/streamaggr/state.qtpl:75
//line lib/streamaggr/state.qtpl:79
}
//line lib/streamaggr/state.qtpl:75
//line lib/streamaggr/state.qtpl:79
qw422016.N().S(`</td><td class="labels">`)
//line lib/streamaggr/state.qtpl:78
//line lib/streamaggr/state.qtpl:82
for asn, as := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:79
//line lib/streamaggr/state.qtpl:83
if asn > 0 {
//line lib/streamaggr/state.qtpl:79
//line lib/streamaggr/state.qtpl:83
qw422016.N().S(`<span>, </span>`)
//line lib/streamaggr/state.qtpl:81
//line lib/streamaggr/state.qtpl:85
}
//line lib/streamaggr/state.qtpl:81
//line lib/streamaggr/state.qtpl:85
qw422016.N().S(`<a href="?rw=`)
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.N().D(an)
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:82
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:86
qw422016.N().D(DEFAULT_LIMIT)
//line lib/streamaggr/state.qtpl:86
qw422016.N().S(`">`)
//line lib/streamaggr/state.qtpl:83
//line lib/streamaggr/state.qtpl:87
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:83
//line lib/streamaggr/state.qtpl:87
qw422016.N().S(`</a>`)
//line lib/streamaggr/state.qtpl:85
//line lib/streamaggr/state.qtpl:89
}
//line lib/streamaggr/state.qtpl:85
//line lib/streamaggr/state.qtpl:89
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:88
//line lib/streamaggr/state.qtpl:92
}
//line lib/streamaggr/state.qtpl:88
//line lib/streamaggr/state.qtpl:92
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
func WriteStreamAggHTML(qq422016 qtio422016.Writer, rws map[string]*Aggregators, rwActive string) {
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
StreamStreamAggHTML(qw422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
func StreamAggHTML(rws map[string]*Aggregators, rwActive string) string {
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
WriteStreamAggHTML(qb422016, rws, rwActive)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
return qs422016
//line lib/streamaggr/state.qtpl:99
//line lib/streamaggr/state.qtpl:103
}
//line lib/streamaggr/state.qtpl:101
func StreamStreamAggOutputStateHTML(qw422016 *qt422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) {
//line lib/streamaggr/state.qtpl:101
//line lib/streamaggr/state.qtpl:105
func StreamStreamAggOutputStateHTML(qw422016 *qt422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) {
//line lib/streamaggr/state.qtpl:105
qw422016.N().S(`<!DOCTYPE html><html lang="en"><head>`)
//line lib/streamaggr/state.qtpl:105
//line lib/streamaggr/state.qtpl:109
htmlcomponents.StreamCommonHeader(qw422016)
//line lib/streamaggr/state.qtpl:105
//line lib/streamaggr/state.qtpl:109
qw422016.N().S(`<title>Stream aggregation</title></head><body>`)
//line lib/streamaggr/state.qtpl:109
//line lib/streamaggr/state.qtpl:113
htmlcomponents.StreamNavbar(qw422016)
//line lib/streamaggr/state.qtpl:109
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12"><h1>Aggregation state</h1><h4> [ <a href="?rw=`)
//line lib/streamaggr/state.qtpl:114
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:114
qw422016.N().S(`">back to aggregations</a> ] </h3><hr /><h6><div class="row"><div class="col-xxl-1">Remote write:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:119
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:119
qw422016.N().S(`</code><div class="w-100"></div><div class="col-xxl-1">Aggregation num:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:123
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:123
qw422016.N().S(`</code><div class="w-100"></div><div class="col-xxl-1">Match:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:127
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:127
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:130
if len(agg.by) > 0 {
//line lib/streamaggr/state.qtpl:130
qw422016.N().S(`<div class="col-xxl-1">By:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:132
qw422016.E().S(strings.Join(agg.by, ", "))
//line lib/streamaggr/state.qtpl:132
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:134
}
//line lib/streamaggr/state.qtpl:135
if len(agg.without) > 0 {
//line lib/streamaggr/state.qtpl:135
qw422016.N().S(`<div class="col-xxl-1">Without:</div><code class="col w-100">`)
//line lib/streamaggr/state.qtpl:137
qw422016.E().S(strings.Join(agg.without, ", "))
//line lib/streamaggr/state.qtpl:137
qw422016.N().S(`</code><div class="w-100"></div>`)
//line lib/streamaggr/state.qtpl:139
}
//line lib/streamaggr/state.qtpl:139
qw422016.N().S(`</div></h6><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:144
for _, a := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:144
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:146
if a.getOutputName() == as.getOutputName() {
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:146
}
//line lib/streamaggr/state.qtpl:146
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:147
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:147
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:147
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:147
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:148
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:148
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:151
}
//line lib/streamaggr/state.qtpl:151
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregation-state" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm">`)
//line lib/streamaggr/state.qtpl:158
//line lib/streamaggr/state.qtpl:113
qw422016.N().S(`<div class="container-fluid"><div class="row"><main class="col-12">`)
//line lib/streamaggr/state.qtpl:118
sr := as.getStateRepresentation(agg.suffix)
sort.Slice(sr, func(i, j int) bool {
return sr[i].metric < sr[j].metric
if filter != "" {
filter = strings.ToLower(filter)
metrics := sr.metrics[:0]
for _, m := range sr.metrics {
if strings.Contains(strings.ToLower(m.metric), filter) {
metrics = append(metrics, m)
}
}
sr.metrics = metrics
}
sort.Slice(sr.metrics, func(i, j int) bool {
return sr.metrics[i].metric < sr.metrics[j].metric
})
if len(sr) > limit {
sr = sr[:limit]
if len(sr.metrics) > limit {
sr.metrics = sr.metrics[:limit]
}
//line lib/streamaggr/state.qtpl:165
qw422016.N().S(`<thead><tr><th scope="col">Metric</th><th scope="col">Current value</th><th scope="col">Samples count</th><th scope="col">Last push time</th><th scope="col">Next push time</th></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:176
for _, asr := range sr {
//line lib/streamaggr/state.qtpl:176
qw422016.N().S(`<tr><td><code>`)
//line lib/streamaggr/state.qtpl:179
qw422016.E().S(asr.metric)
//line lib/streamaggr/state.qtpl:179
qw422016.N().S(`</code></td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:135
qw422016.N().S(`<h1>Aggregation state</h1><h4> [ <a href="?rw=`)
//line lib/streamaggr/state.qtpl:138
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:138
qw422016.N().S(`">back to aggregations</a> ] </h3><hr /><h6><div class="row container-sm"><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="remote-write" style="width: 200px">Remote write:</span><input type="text" class="form-control" aria-label="Remote write" aria-describedby="remote-write" value="`)
//line lib/streamaggr/state.qtpl:144
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:144
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="agg-num" style="width: 200px">Aggregation num:</span><input type="number" class="form-control" aria-label="Aggregation num" aria-describedby="agg-num" value="`)
//line lib/streamaggr/state.qtpl:149
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:149
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="match" style="width: 200px">Match:</span><input type="string" class="form-control" aria-label="Match" aria-describedby="match" value="`)
//line lib/streamaggr/state.qtpl:154
qw422016.E().S(agg.match.String())
//line lib/streamaggr/state.qtpl:154
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:157
if len(agg.by) > 0 {
//line lib/streamaggr/state.qtpl:157
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="by" style="width: 200px">By:</span><input type="string" class="form-control" aria-label="By" aria-describedby="by" value="`)
//line lib/streamaggr/state.qtpl:160
qw422016.E().S(strings.Join(agg.by, ", "))
//line lib/streamaggr/state.qtpl:160
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:162
}
//line lib/streamaggr/state.qtpl:163
if len(agg.without) > 0 {
//line lib/streamaggr/state.qtpl:163
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="without" style="width: 200px">Without:</span><input type="string" class="form-control" aria-label="Without" aria-describedby="without" value="`)
//line lib/streamaggr/state.qtpl:166
qw422016.E().S(strings.Join(agg.without, ", "))
//line lib/streamaggr/state.qtpl:166
qw422016.N().S(`" readonly /></div>`)
//line lib/streamaggr/state.qtpl:168
}
//line lib/streamaggr/state.qtpl:168
qw422016.N().S(`<div class="input-group input-group-sm mb-1"><span class="input-group-text" id="interval" style="width: 200px">Interval (seconds):</span><input type="number" class="form-control" aria-label="Interval (seconds)" aria-describedby="interval" value="`)
//line lib/streamaggr/state.qtpl:172
qw422016.E().V(sr.intervalSecs)
//line lib/streamaggr/state.qtpl:172
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="last-push-time" style="width: 200px">Last push time:</span><input type="string" class="form-control" aria-label="Last push time" aria-describedby="last-push-time" value="`)
//line lib/streamaggr/state.qtpl:177
if sr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:177
qw422016.N().S(`-`)
//line lib/streamaggr/state.qtpl:177
} else {
//line lib/streamaggr/state.qtpl:177
qw422016.E().S(time.Unix(int64(sr.lastPushTimestamp), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:177
}
//line lib/streamaggr/state.qtpl:177
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="next-push-time" style="width: 200px">Next push time:</span><input type="string" class="form-control" aria-label="Next push time" aria-describedby="next-push-time" value="`)
//line lib/streamaggr/state.qtpl:182
qw422016.N().F(asr.currentValue)
if sr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:182
qw422016.N().S(`</td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:185
qw422016.E().S(fmt.Sprintf("%v", asr.samplesCount))
//line lib/streamaggr/state.qtpl:185
qw422016.N().S(`</td><td>`)
qw422016.E().S(time.Unix(int64(agg.initialTime+sr.intervalSecs), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:182
} else {
//line lib/streamaggr/state.qtpl:182
qw422016.E().S(time.Unix(int64(sr.lastPushTimestamp+sr.intervalSecs), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:182
}
//line lib/streamaggr/state.qtpl:182
qw422016.N().S(`" readonly /></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="limit-label" style="width: 200px">Items on the page:</span><input id="limit" type="number" class="form-control" aria-label="Limit" aria-describedby="limit-label" value="`)
//line lib/streamaggr/state.qtpl:187
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:187
qw422016.N().S(`" /><button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:188
if asr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:189
qw422016.E().S(time.Unix(int64(agg.initialTime), 0).String())
//line lib/streamaggr/state.qtpl:190
} else {
//line lib/streamaggr/state.qtpl:191
qw422016.E().S(time.Unix(int64(asr.lastPushTimestamp), 0).String())
//line lib/streamaggr/state.qtpl:192
}
//line lib/streamaggr/state.qtpl:192
qw422016.N().S(`</td><td>`)
//line lib/streamaggr/state.qtpl:195
if asr.lastPushTimestamp == 0 {
//line lib/streamaggr/state.qtpl:196
qw422016.E().S(time.Unix(int64(asr.nextPushTimestamp+agg.initialTime), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:197
} else {
//line lib/streamaggr/state.qtpl:198
qw422016.E().S(time.Unix(int64(asr.nextPushTimestamp), 0).Format(time.RFC3339))
//line lib/streamaggr/state.qtpl:199
}
//line lib/streamaggr/state.qtpl:199
qw422016.N().S(`</td></tr>`)
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:188
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`&limit='+document.querySelector(`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`#limit`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`).value+'&filter='+encodeURIComponent(document.querySelector(`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`#filter`)
//line lib/streamaggr/state.qtpl:188
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:188
qw422016.N().S(`).value)">apply</button></div><div class="input-group input-group-sm mb-1"><span class="input-group-text" id="filter-label" style="width: 200px">Filter:</span><input id="filter" type="text" class="form-control" aria-label="Filter" aria-describedby="filter-label" value="`)
//line lib/streamaggr/state.qtpl:193
qw422016.E().S(filter)
//line lib/streamaggr/state.qtpl:193
qw422016.N().S(`" /><button type="button" class="btn btn-outline-secondary" onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:194
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:194
qw422016.E().S(as.getOutputName())
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`&filter='+encodeURIComponent(document.querySelector(`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`#filter`)
//line lib/streamaggr/state.qtpl:194
qw422016.N().S("`")
//line lib/streamaggr/state.qtpl:194
qw422016.N().S(`).value)">apply</button></div></div></h6><hr /><ul class="nav nav-tabs" id="rw-tab" role="tablist">`)
//line lib/streamaggr/state.qtpl:200
for _, a := range agg.aggrStates {
//line lib/streamaggr/state.qtpl:200
qw422016.N().S(`<li class="nav-item" role="presentation"><button class="nav-link`)
//line lib/streamaggr/state.qtpl:202
if a.getOutputName() == as.getOutputName() {
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(` `)
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(`active`)
//line lib/streamaggr/state.qtpl:202
}
//line lib/streamaggr/state.qtpl:202
qw422016.N().S(`" type="button" role="tab"onclick="location.href='?rw=`)
//line lib/streamaggr/state.qtpl:203
qw422016.E().S(rwActive)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&agg=`)
//line lib/streamaggr/state.qtpl:203
qw422016.N().D(aggNum)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&output=`)
//line lib/streamaggr/state.qtpl:203
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`&limit=`)
//line lib/streamaggr/state.qtpl:203
qw422016.N().D(limit)
//line lib/streamaggr/state.qtpl:203
qw422016.N().S(`'">`)
//line lib/streamaggr/state.qtpl:204
qw422016.E().S(a.getOutputName())
//line lib/streamaggr/state.qtpl:204
qw422016.N().S(`</button></li>`)
//line lib/streamaggr/state.qtpl:207
}
//line lib/streamaggr/state.qtpl:202
//line lib/streamaggr/state.qtpl:207
qw422016.N().S(`</ul><div class="tab-content"><div class="tab-pane active" role="tabpanel"><div id="aggregation-state" class="table-responsive"><table class="table table-striped table-hover table-bordered table-sm"><thead><tr><th scope="col">Metric</th><th scope="col">Current value</th><th scope="col">Samples count</th></tr></thead><tbody>`)
//line lib/streamaggr/state.qtpl:221
for _, asr := range sr.metrics {
//line lib/streamaggr/state.qtpl:221
qw422016.N().S(`<tr><td><code>`)
//line lib/streamaggr/state.qtpl:224
qw422016.E().S(asr.metric)
//line lib/streamaggr/state.qtpl:224
qw422016.N().S(`</code></td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:227
qw422016.N().F(asr.currentValue)
//line lib/streamaggr/state.qtpl:227
qw422016.N().S(`</td><td class="text-end">`)
//line lib/streamaggr/state.qtpl:230
qw422016.E().S(fmt.Sprintf("%v", asr.samplesCount))
//line lib/streamaggr/state.qtpl:230
qw422016.N().S(`</td></tr>`)
//line lib/streamaggr/state.qtpl:233
}
//line lib/streamaggr/state.qtpl:233
qw422016.N().S(`</tbody></table></div></div></div></main></div></div></body></html>`)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
}
//line lib/streamaggr/state.qtpl:213
func WriteStreamAggOutputStateHTML(qq422016 qtio422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) {
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
func WriteStreamAggOutputStateHTML(qq422016 qtio422016.Writer, rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) {
//line lib/streamaggr/state.qtpl:244
qw422016 := qt422016.AcquireWriter(qq422016)
//line lib/streamaggr/state.qtpl:213
StreamStreamAggOutputStateHTML(qw422016, rwActive, aggNum, agg, as, limit)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
StreamStreamAggOutputStateHTML(qw422016, rwActive, aggNum, agg, as, limit, filter)
//line lib/streamaggr/state.qtpl:244
qt422016.ReleaseWriter(qw422016)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
}
//line lib/streamaggr/state.qtpl:213
func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int) string {
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
func StreamAggOutputStateHTML(rwActive string, aggNum int, agg *aggregator, as aggrState, limit int, filter string) string {
//line lib/streamaggr/state.qtpl:244
qb422016 := qt422016.AcquireByteBuffer()
//line lib/streamaggr/state.qtpl:213
WriteStreamAggOutputStateHTML(qb422016, rwActive, aggNum, agg, as, limit)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
WriteStreamAggOutputStateHTML(qb422016, rwActive, aggNum, agg, as, limit, filter)
//line lib/streamaggr/state.qtpl:244
qs422016 := string(qb422016.B)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
qt422016.ReleaseByteBuffer(qb422016)
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
return qs422016
//line lib/streamaggr/state.qtpl:213
//line lib/streamaggr/state.qtpl:244
}

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -12,30 +13,24 @@ import (
type stddevAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type stddevStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
deleteDeadline uint64
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
}
func newStddevAggrState(interval time.Duration, stalenessInterval time.Duration) *stddevAggrState {
func newStddevAggrState(interval time.Duration) *stddevAggrState {
return &stddevAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *stddevAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -56,7 +51,6 @@ again:
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -66,58 +60,34 @@ again:
}
}
func (as *stddevAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*stddevStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *stddevAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stddevStateValue)
sv.mu.Lock()
var stddev float64
if sv.count > 0 {
stddev = math.Sqrt(sv.q / sv.count)
}
sv.count = 0
sv.q = 0
sv.avg = 0
stddev := math.Sqrt(sv.q / sv.count)
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stddev)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *stddevAggrState) getOutputName() string {
return "stddev"
}
func (as *stddevAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *stddevAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*stddevStateValue)
value.mu.Lock()
@ -125,14 +95,16 @@ func (as *stddevAggrState) getStateRepresentation(suffix string) []aggrStateRepr
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: math.Sqrt(value.q / value.count),
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: uint64(value.count),
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: math.Sqrt(value.q / value.count),
samplesCount: uint64(value.count),
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,30 +12,24 @@ import (
type stdvarAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type stdvarStateValue struct {
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
deleteDeadline uint64
mu sync.Mutex
count float64
avg float64
q float64
deleted bool
}
func newStdvarAggrState(interval time.Duration, stalenessInterval time.Duration) *stdvarAggrState {
func newStdvarAggrState(interval time.Duration) *stdvarAggrState {
return &stdvarAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *stdvarAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -55,7 +50,6 @@ again:
avg := sv.avg + (value-sv.avg)/sv.count
sv.q += (value - sv.avg) * (value - avg)
sv.avg = avg
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -65,57 +59,33 @@ again:
}
}
func (as *stdvarAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*stdvarStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *stdvarAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*stdvarStateValue)
sv.mu.Lock()
var stdvar float64
if sv.count > 0 {
stdvar = sv.q / sv.count
}
sv.q = 0
sv.avg = 0
sv.count = 0
stdvar := sv.q / sv.count
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, stdvar)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *stdvarAggrState) getOutputName() string {
return "stdvar"
}
func (as *stdvarAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *stdvarAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*stdvarStateValue)
value.mu.Lock()
@ -123,14 +93,16 @@ func (as *stdvarAggrState) getStateRepresentation(suffix string) []aggrStateRepr
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.q / value.count,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: uint64(value.count),
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.q / value.count,
samplesCount: uint64(value.count),
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -253,15 +253,19 @@ type aggrState interface {
pushSample(inputKey, outputKey string, value float64)
appendSeriesForFlush(ctx *flushCtx)
getOutputName() string
getStateRepresentation(suffix string) []aggrStateRepresentation
getStateRepresentation(suffix string) aggrStateRepresentation
}
type aggrStateRepresentation struct {
metric string
intervalSecs uint64
lastPushTimestamp uint64
nextPushTimestamp uint64
currentValue float64
samplesCount uint64
metrics []aggrStateRepresentationMetric
}
type aggrStateRepresentationMetric struct {
metric string
currentValue float64
samplesCount uint64
}
// PushFunc is called by Aggregators when it needs to push its state to metrics storage
@ -344,7 +348,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
}
phis[j] = phi
}
aggrStates[i] = newQuantilesAggrState(interval, stalenessInterval, phis)
aggrStates[i] = newQuantilesAggrState(interval, phis)
continue
}
switch output {
@ -357,27 +361,27 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
case "increase_pure":
aggrStates[i] = newIncreasePureAggrState(interval, stalenessInterval)
case "count_series":
aggrStates[i] = newCountSeriesAggrState(interval, stalenessInterval)
aggrStates[i] = newCountSeriesAggrState(interval)
case "count_samples":
aggrStates[i] = newCountSamplesAggrState(interval, stalenessInterval)
aggrStates[i] = newCountSamplesAggrState(interval)
case "count_samples_total":
aggrStates[i] = newCountSamplesTotalAggrState(interval, stalenessInterval)
case "sum_samples":
aggrStates[i] = newSumSamplesAggrState(interval, stalenessInterval)
aggrStates[i] = newSumSamplesAggrState(interval)
case "sum_samples_total":
aggrStates[i] = newSumSamplesTotalAggrState(interval, stalenessInterval)
case "last":
aggrStates[i] = newLastAggrState(interval, stalenessInterval)
aggrStates[i] = newLastAggrState(interval)
case "min":
aggrStates[i] = newMinAggrState(interval, stalenessInterval)
aggrStates[i] = newMinAggrState(interval)
case "max":
aggrStates[i] = newMaxAggrState(interval, stalenessInterval)
aggrStates[i] = newMaxAggrState(interval)
case "avg":
aggrStates[i] = newAvgAggrState(interval, stalenessInterval)
aggrStates[i] = newAvgAggrState(interval)
case "stddev":
aggrStates[i] = newStddevAggrState(interval, stalenessInterval)
aggrStates[i] = newStddevAggrState(interval)
case "stdvar":
aggrStates[i] = newStdvarAggrState(interval, stalenessInterval)
aggrStates[i] = newStdvarAggrState(interval)
case "histogram_bucket":
aggrStates[i] = newHistogramBucketAggrState(interval, stalenessInterval)
default:
@ -398,7 +402,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
var dedupAggr *lastAggrState
if dedupInterval > 0 {
dedupAggr = newLastAggrState(interval, stalenessInterval)
dedupAggr = newLastAggrState(interval)
}
// initialize the aggregator

View file

@ -2,6 +2,7 @@ package streamaggr
import (
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -11,8 +12,7 @@ import (
type sumSamplesAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type sumSamplesStateValue struct {
@ -23,17 +23,13 @@ type sumSamplesStateValue struct {
deleteDeadline uint64
}
func newSumSamplesAggrState(interval time.Duration, stalenessInterval time.Duration) *sumSamplesAggrState {
func newSumSamplesAggrState(interval time.Duration) *sumSamplesAggrState {
return &sumSamplesAggrState{
intervalSecs: roundDurationToSecs(interval),
stalenessSecs: roundDurationToSecs(stalenessInterval),
intervalSecs: roundDurationToSecs(interval),
}
}
func (as *sumSamplesAggrState) pushSample(_, outputKey string, value float64) {
currentTime := fasttime.UnixTimestamp()
deleteDeadline := currentTime + as.stalenessSecs
again:
v, ok := as.m.Load(outputKey)
if !ok {
@ -55,7 +51,6 @@ again:
if !deleted {
sv.sum += value
sv.samplesCount++
sv.deleteDeadline = deleteDeadline
}
sv.mu.Unlock()
if deleted {
@ -65,52 +60,34 @@ again:
}
}
func (as *sumSamplesAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
m.Range(func(k, v interface{}) bool {
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
deleted := currentTime > sv.deleteDeadline
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
}
sv.mu.Unlock()
if deleted {
m.Delete(k)
}
return true
})
}
func (as *sumSamplesAggrState) appendSeriesForFlush(ctx *flushCtx) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v interface{}) bool {
// Atomically delete the entry from the map, so new entry is created for the next flush.
m.Delete(k)
sv := v.(*sumSamplesStateValue)
sv.mu.Lock()
sum := sv.sum
sv.sum = 0
// Mark the entry as deleted, so it won't be updated anymore by concurrent pushSample() calls.
sv.deleted = true
sv.mu.Unlock()
key := k.(string)
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, sum)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *sumSamplesAggrState) getOutputName() string {
return "sum_samples"
}
func (as *sumSamplesAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *sumSamplesAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*sumSamplesStateValue)
value.mu.Lock()
@ -118,14 +95,16 @@ func (as *sumSamplesAggrState) getStateRepresentation(suffix string) []aggrState
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -13,7 +14,7 @@ type sumSamplesTotalAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type sumSamplesTotalStateValue struct {
@ -106,15 +107,15 @@ func (as *sumSamplesTotalAggrState) appendSeriesForFlush(ctx *flushCtx) {
ctx.appendSeries(key, as.getOutputName(), currentTimeMsec, sum)
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *sumSamplesTotalAggrState) getOutputName() string {
return "sum_samples_total"
}
func (as *sumSamplesTotalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *sumSamplesTotalAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*sumSamplesTotalStateValue)
value.mu.Lock()
@ -122,14 +123,16 @@ func (as *sumSamplesTotalAggrState) getStateRepresentation(suffix string) []aggr
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.sum,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -14,7 +15,7 @@ type totalAggrState struct {
intervalSecs uint64
ignoreInputDeadline uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type totalStateValue struct {
@ -141,15 +142,15 @@ func (as *totalAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *totalAggrState) getOutputName() string {
return "total"
}
func (as *totalAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *totalAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*totalStateValue)
value.mu.Lock()
@ -157,14 +158,16 @@ func (as *totalAggrState) getStateRepresentation(suffix string) []aggrStateRepre
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}

View file

@ -3,6 +3,7 @@ package streamaggr
import (
"math"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -13,7 +14,7 @@ type totalPureAggrState struct {
m sync.Map
intervalSecs uint64
stalenessSecs uint64
lastPushTimestamp uint64
lastPushTimestamp atomic.Uint64
}
type totalPureStateValue struct {
@ -127,15 +128,15 @@ func (as *totalPureAggrState) appendSeriesForFlush(ctx *flushCtx) {
}
return true
})
as.lastPushTimestamp = currentTime
as.lastPushTimestamp.Store(currentTime)
}
func (as *totalPureAggrState) getOutputName() string {
return "total_pure"
}
func (as *totalPureAggrState) getStateRepresentation(suffix string) []aggrStateRepresentation {
result := make([]aggrStateRepresentation, 0)
func (as *totalPureAggrState) getStateRepresentation(suffix string) aggrStateRepresentation {
metrics := make([]aggrStateRepresentationMetric, 0)
as.m.Range(func(k, v any) bool {
value := v.(*totalPureStateValue)
value.mu.Lock()
@ -143,14 +144,16 @@ func (as *totalPureAggrState) getStateRepresentation(suffix string) []aggrStateR
if value.deleted {
return true
}
result = append(result, aggrStateRepresentation{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
lastPushTimestamp: as.lastPushTimestamp,
nextPushTimestamp: as.lastPushTimestamp + as.intervalSecs,
samplesCount: value.samplesCount,
metrics = append(metrics, aggrStateRepresentationMetric{
metric: getLabelsStringFromKey(k.(string), suffix, as.getOutputName()),
currentValue: value.total,
samplesCount: value.samplesCount,
})
return true
})
return result
return aggrStateRepresentation{
intervalSecs: as.intervalSecs,
lastPushTimestamp: as.lastPushTimestamp.Load(),
metrics: metrics,
}
}