app/vmagent/remotewrite: follow-up for f153f54d11

- Move the remaining code responsible for stream aggregation initialization from remotewrite.go to streamaggr.go .
  This improves code maintainability a bit.

- Properly shut down streamaggr.Aggregators initialized inside remotewrite.CheckStreamAggrConfigs().
  This prevents from potential resource leaks.

- Use separate functions for initializing and reloading of global stream aggregation and per-remoteWrite.url stream aggregation.
  This makes the code easier to read and maintain. This also fixes INFO and ERROR logs emitted by these functions.

- Add an ability to specify `name` option in every stream aggregation config. This option is used as `name` label
  in metrics exposed by stream aggregation at /metrics page. This simplifies investigation of the exposed metrics.

- Add `path` label additionally to `name`, `url` and `position` labels at metrics exposed by streaming aggregation.
  This label should simplify investigation of the exposed metrics.

- Remove `match` and `group` labels from metrics exposed by streaming aggregation, since they have little practical applicability:
  it is hard to use these labels in query filters and aggregation functions.

- Rename the metric `vm_streamaggr_flushed_samples_total` to less misleading `vm_streamaggr_output_samples_total` .
  This metric shows the number of samples generated by the corresponding streaming aggregation rule.
  This metric has been added in the commit 861852f262 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462

- Remove the metric `vm_streamaggr_stale_samples_total`, since it is unclear how it can be used in practice.
  This metric has been added in the commit 861852f262 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6462

- Remove Alias and aggrID fields from streamaggr.Options struct, since these fields aren't related to optional params,
  which could modify the behaviour of the constructed streaming aggregator.
  Convert the Alias field to regular argument passed to LoadFromFile() function, since this argument is mandatory.

- Pass Options arg to LoadFromFile() function by reference, since this structure is quite big.
  This also allows passing nil instead of Options when default options are enough.

- Add `name`, `path`, `url` and `position` labels to `vm_streamaggr_dedup_state_size_bytes` and `vm_streamaggr_dedup_state_items_count` metrics,
  so they have consistent set of labels comparing to the rest of streaming aggregation metrics.

- Convert aggregator.aggrStates field type from `map[string]aggrState` to `[]aggrOutput`, where `aggrOutput` contains the corresponding
  `aggrState` plus all the related metrics (currently only `vm_streamaggr_output_samples_total` metric is exposed with the corresponding
  `output` label per each configured output function). This simplifies and speeds up the code responsible for updating per-output
  metrics. This is a follow-up for the commit 2eb1bc4f81 .
  See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6604

- Added missing urls to docs ( https://docs.victoriametrics.com/stream-aggregation/ ) in error messages. These urls help users
  figuring out why VictoriaMetrics or vmagent generates the corresponding error messages. The urls were removed for unknown reason
  in the commit 2eb1bc4f81 .

- Fix incorrect update for `vm_streamaggr_output_samples_total` metric in flushCtx.appendSeriesWithExtraLabel() function.
  While at it, reduce memory usage by limiting the maximum number of samples per flush to 10K.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6268
This commit is contained in:
Aliaksandr Valialkin 2024-07-15 18:01:37 +02:00
parent 7ba477e08a
commit db557b86ee
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
16 changed files with 484 additions and 566 deletions

View file

@ -114,7 +114,7 @@ func main() {
logger.Fatalf("error when checking relabel configs: %s", err)
}
if err := remotewrite.CheckStreamAggrConfigs(); err != nil {
logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
logger.Fatalf("error when checking -streamAggr.config and -remoteWrite.streamAggr.config: %s", err)
}
logger.Infof("all the configs are ok; exiting with 0 status code")
return

View file

@ -207,16 +207,7 @@ func Init() {
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
sasFile, sasOpts := getStreamAggrOpts(-1)
if sasFile != "" {
sas, err := newStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -streamAggr.config=%q: %s", sasFile, err)
}
sasGlobal.Store(sas)
} else if sasOpts.DedupInterval > 0 {
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias)
}
initStreamAggrConfigGlobal()
rwctxsGlobal = newRemoteWriteCtxs(nil, *remoteWriteURLs)
@ -237,9 +228,9 @@ func Init() {
defer configReloaderWG.Done()
for {
select {
case <-sighupCh:
case <-configReloaderStopCh:
return
case <-sighupCh:
}
reloadRelabelConfigs()
reloadStreamAggrConfigs()
@ -536,7 +527,7 @@ func getEligibleRemoteWriteCtxs(tss []prompbmarshal.TimeSeries, forceDropSamples
return rwctxs, true
}
func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) {
func pushToRemoteStoragesTrackDropped(tss []prompbmarshal.TimeSeries) {
rwctxs, _ := getEligibleRemoteWriteCtxs(tss, true)
if len(rwctxs) == 0 {
return
@ -850,22 +841,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
}
// Initialize sas
sasFile, sasOpts := getStreamAggrOpts(argIdx)
if sasFile != "" {
sas, err := newStreamAggrConfig(argIdx, rwctx.pushInternalTrackDropped)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
}
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if sasOpts.DedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias)
}
rwctx.initStreamAggrConfig()
return rwctx
}

View file

@ -77,14 +77,16 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
rowsDroppedByRelabel: metrics.GetOrCreateCounter(`bar`),
}
if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "global")
rwctx.deduplicator = streamaggr.NewDeduplicator(nil, dedupInterval, nil, "dedup-global")
}
if len(streamAggrConfig) > 0 {
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), nil, streamaggr.Options{})
if streamAggrConfig != "" {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
sas, err := streamaggr.LoadFromData([]byte(streamAggrConfig), pushNoop, nil, "global")
if err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err)
}
defer sas.MustStop()
rwctx.sas.Store(sas)
}

View file

@ -61,104 +61,180 @@ var (
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
if _, err := newStreamAggrConfig(-1, pushNoop); err != nil {
return fmt.Errorf("could not load -streamAggr.config stream aggregation config: %w", err)
}
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*streamAggrConfig), len(*remoteWriteURLs))
}
for idx := range *streamAggrConfig {
if _, err := newStreamAggrConfig(idx, pushNoop); err != nil {
// Check global config
sas, err := newStreamAggrConfigGlobal()
if err != nil {
return err
}
sas.MustStop()
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", len(*streamAggrConfig), len(*remoteWriteURLs))
}
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
for idx := range *streamAggrConfig {
sas, err := newStreamAggrConfigPerURL(idx, pushNoop)
if err != nil {
return err
}
sas.MustStop()
}
return nil
}
func reloadStreamAggrConfigs() {
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
for idx, rwctx := range rwctxsGlobal {
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
reloadStreamAggrConfigGlobal()
for _, rwctx := range rwctxsGlobal {
rwctx.reloadStreamAggrConfig()
}
}
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
path, opts := getStreamAggrOpts(idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
func reloadStreamAggrConfigGlobal() {
path := *streamAggrGlobalConfig
if path == "" {
return
}
var sas *streamaggr.Aggregators
if idx < 0 {
sas = sasGlobal.Load()
} else {
sas = rwctxsGlobal[idx].sas.Load()
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigGlobal()
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload -streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err)
return
}
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
var sasOld *streamaggr.Aggregators
if idx < 0 {
sasOld = sasGlobal.Swap(sasNew)
} else {
sasOld = rwctxsGlobal[idx].sas.Swap(sasNew)
}
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
logger.Infof("successfully reloaded -streamAggr.config=%q", path)
} else {
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
}
func getStreamAggrOpts(idx int) (string, streamaggr.Options) {
if idx < 0 {
return *streamAggrGlobalConfig, streamaggr.Options{
func initStreamAggrConfigGlobal() {
sas, err := newStreamAggrConfigGlobal()
if err != nil {
logger.Fatalf("cannot initialize gloabl stream aggregators: %s", err)
}
if sas != nil {
filePath := sas.FilePath()
sasGlobal.Store(sas)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
} else {
dedupInterval := streamAggrGlobalDedupInterval.Duration()
if dedupInterval > 0 {
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrDropInputLabels, "dedup-global")
}
}
}
func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
idx := rwctx.idx
sas, err := rwctx.newStreamAggrConfig()
if err != nil {
logger.Fatalf("cannot initialize stream aggregators: %s", err)
}
if sas != nil {
filePath := sas.FilePath()
rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp())
} else {
dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx)
if dedupInterval > 0 {
alias := fmt.Sprintf("dedup-%d", idx+1)
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias)
}
}
}
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
if path == "" {
return
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := rwctx.newStreamAggrConfig()
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err)
return
}
sas := rwctx.sas.Load()
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path)
} else {
sasNew.MustStop()
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
}
func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) {
path := *streamAggrGlobalConfig
if path == "" {
return nil, nil
}
opts := &streamaggr.Options{
DedupInterval: streamAggrGlobalDedupInterval.Duration(),
DropInputLabels: *streamAggrGlobalDropInputLabels,
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
Alias: "global",
}
sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global")
if err != nil {
return nil, fmt.Errorf("cannot load -streamAggr.config=%q: %w", *streamAggrGlobalConfig, err)
}
url := fmt.Sprintf("%d:secret-url", idx+1)
return sas, nil
}
func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, error) {
return newStreamAggrConfigPerURL(rwctx.idx, rwctx.pushInternalTrackDropped)
}
func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path := streamAggrConfig.GetOptionalArg(idx)
if path == "" {
return nil, nil
}
alias := fmt.Sprintf("%d:secret-url", idx+1)
if *showRemoteWriteURL {
url = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx))
alias = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx))
}
opts := streamaggr.Options{
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: url,
}
if len(*streamAggrConfig) == 0 {
return "", opts
sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias)
if err != nil {
return nil, fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", path, err)
}
return streamAggrConfig.GetOptionalArg(idx), opts
}
func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts streamaggr.Options) (*streamaggr.Aggregators, error) {
if len(path) == 0 {
// Skip empty stream aggregation config.
return nil, nil
}
return streamaggr.LoadFromFile(path, pushFunc, opts)
}
func newStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path, opts := getStreamAggrOpts(idx)
return newStreamAggrConfigWithOpts(pushFunc, path, opts)
return sas, nil
}

View file

@ -57,14 +57,13 @@ func CheckStreamAggrConfig() error {
return nil
}
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
opts := streamaggr.Options{
opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: "global",
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts)
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts, "global")
if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
@ -77,25 +76,22 @@ func CheckStreamAggrConfig() error {
// MustStopStreamAggr must be called when stream aggr is no longer needed.
func InitStreamAggr() {
saCfgReloaderStopCh = make(chan struct{})
rwctx := "global"
if *streamAggrConfig == "" {
if *streamAggrDedupInterval > 0 {
deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, rwctx)
deduplicator = streamaggr.NewDeduplicator(pushAggregateSeries, *streamAggrDedupInterval, *streamAggrDropInputLabels, "global")
}
return
}
sighupCh := procutil.NewSighupChan()
opts := streamaggr.Options{
opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: rwctx,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global")
if err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
}
@ -123,14 +119,13 @@ func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
opts := streamaggr.Options{
opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: "global",
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts, "global")
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()

View file

@ -5178,7 +5178,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0",
"expr": "sum(rate(vm_streamaggr_output_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0",
"instant": false,
"legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}",
"range": true,
@ -5496,103 +5496,6 @@
"title": "Dedup flush duration 0.99 quantile ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"axisSoftMin": 0,
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 31
},
"id": 144,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "victoriametrics-datasource",
"uid": "$ds"
},
"editorMode": "code",
"expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0",
"instant": false,
"legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}",
"range": true,
"refId": "A"
}
],
"title": "Staleness rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "victoriametrics-datasource",

View file

@ -5177,7 +5177,7 @@
"uid": "$ds"
},
"editorMode": "code",
"expr": "sum(rate(vm_streamaggr_flushed_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0",
"expr": "sum(rate(vm_streamaggr_output_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval])) without (instance, pod) > 0",
"instant": false,
"legendFormat": "{{url}} ({{job}}): match={{match}}; output={{output}}",
"range": true,
@ -5495,103 +5495,6 @@
"title": "Dedup flush duration 0.99 quantile ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"description": "Shows the eviction rate of time series because of staleness.\n\nThere are two stages where series can be marked as stale.\n1. Input. Aggregator keeps in memory each received unique time series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for this series. \n\n2. Output. The output key is a resulting time series produced by aggregating many input series. The time series becomes stale and gets removed if no samples were received during [staleness interval](https://docs.victoriametrics.com/stream-aggregation/#staleness) for any of input series for this aggregation.\n\nIncrease in `input` keys shows that series previously matched by the aggregation rule now became stale.\n\nIncrease in `output` keys shows that series previously produced by the aggregation rule now became stale.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisBorderShow": false,
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"axisSoftMin": 0,
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 31
},
"id": 144,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "$ds"
},
"editorMode": "code",
"expr": "increase(vm_streamaggr_stale_samples_total{job=~\"$job\",instance=~\"$instance\", url=~\"$url\"}[$__rate_interval]) > 0",
"instant": false,
"legendFormat": "{{url}} ({{job}}): match={{match}}; key={{key}}",
"range": true,
"refId": "A"
}
],
"title": "Staleness rate ($instance)",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",

View file

@ -42,14 +42,17 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [dashboards](https://grafana.com/orgs/victoriametrics): add [Grafana dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmauth.json) and [alerting rules](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/deployment/docker/alerts-vmauth.yml) for [vmauth](https://docs.victoriametrics.com/vmauth/) dashboard. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4313) for details.
* FEATURE: [vmauth](https://docs.victoriametrics.com/vmauth/): reduces CPU usage by reusing request body buffer. Allows to disable requests caching with `-maxRequestBodySizeToRetry=0`. See this [PR](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6533) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): added `yandexcloud_sd` AWS API IMDSv2 support.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/): expose metrics related to [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/):
* `vm_streamaggr_matched_samples_total` - shows the number of samples matched by the aggregation rule;
* `vm_streamaggr_flushed_samples_total` - shows the number of samples produced by the aggregation rule;
* `vm_streamaggr_samples_lag_seconds` - shows the max lag between samples timestamps within one batch received by the aggregation;
* `vm_streamaggr_stale_samples_total` - shows the number of time series that became [stale](https://docs.victoriametrics.com/stream-aggregation/#staleness) during aggregation;
* metrics related to stream aggregation got additional labels `match` (matching param), `group` (`by` or `without` param), `url` (address of `remoteWrite.url` where aggregation is applied), `position` (the position of the aggregation rule in config file).
* These and other metrics were reflected on the [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json) in `stream aggregation` section.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent/) and [Single-node VictoriaMetrics](https://docs.victoriametrics.com/): add `-graphite.sanitizeMetricName` cmd-line flag for sanitizing metrics ingested via [Graphite protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6077).
* FEATURE: [streaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): expose the following metrics at `/metrics` page of [vmagent](https://docs.victoriametrics.com/vmagent/) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/):
* `vm_streamaggr_matched_samples_total` - the number of input samples matched by the corresponding aggregation rule
* `vm_streamaggr_output_samples_total` - the number of output samples produced by the corresponding aggregation rule
* `vm_streamaggr_samples_lag_seconds` - [histogram](https://docs.victoriametrics.com/keyconcepts/#histogram) with the lag between the current time and the timestamp seen in the aggregated input samples
* FEATURE: [steaming aggregation](https://docs.victoriametrics.com/stream-aggregation/): add new labels to `vl_streamaggr_*` metrics:
* `name` - the name of the streaming aggregation rule, which can be configured via `name` option - see [these docs](https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config).
* `url` - `-remoteWrite.url` for the corresponding `-remoteWrite.streamAggr.config`
* `path` - path to the corresponding streaming aggregation config file
* `position` - the position of the aggregation rule in the corresponding streaming aggregation config file
* FEATURE: [vmagent dashboard](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards/vmagent.json): `stream aggregation` section: add graphs based on newly exposed streaming aggregation metrics.
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): do not retry RPC calls to vmstorage nodes if [complexity limits](https://docs.victoriametrics.com/#resource-usage-limits) were exceeded.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert/): make `-replay.timeTo` optional in [replay mode](https://docs.victoriametrics.com/vmalert/#rules-backfilling). When omitted, the current timestamp will be used. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6492).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): show compacted result in the JSON tab for query results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6559).

View file

@ -1018,6 +1018,13 @@ At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr
specified individually per each `-remoteWrite.url`:
```yaml
# name is an optional name of the given streaming aggregation config.
#
# If it is set, then it is used as `name` label in the exposed metrics
# for the given aggregation config at /metrics page.
# See https://docs.victoriametrics.com/vmagent/#monitoring and https://docs.victoriametrics.com/#monitoring
- name: 'foobar'
# match is an optional filter for incoming samples to aggregate.
# It can contain arbitrary Prometheus series selector
# according to https://docs.victoriametrics.com/keyconcepts/#filtering .

View file

@ -35,6 +35,8 @@ type Deduplicator struct {
// An optional dropLabels list may contain label names, which must be dropped before de-duplicating samples.
// Common case is to drop `replica`-like labels from samples received from HA datasources.
//
// alias is url label used in metrics exposed by the returned Deduplicator.
//
// MustStop must be called on the returned deduplicator in order to free up occupied resources.
func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels []string, alias string) *Deduplicator {
d := &Deduplicator{
@ -47,7 +49,8 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
ms := d.ms
metricLabels := fmt.Sprintf(`url=%q`, alias)
metricLabels := fmt.Sprintf(`name="dedup",url=%q`, alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
return float64(d.da.sizeBytes())
})
@ -55,8 +58,8 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
return float64(d.da.itemsCount())
})
d.dedupFlushDuration = ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.dedupFlushTimeouts = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
d.dedupFlushDuration = ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels))
d.dedupFlushTimeouts = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels))
metrics.RegisterSet(ms)

View file

@ -66,9 +66,8 @@ func (as *histogramBucketAggrState) pushSamples(samples []pushSample) {
}
}
func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *histogramBucketAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
var staleOutputSamples int
m.Range(func(k, v any) bool {
sv := v.(*histogramBucketStateValue)
@ -77,7 +76,6 @@ func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime
if deleted {
// Mark the current entry as deleted
sv.deleted = deleted
staleOutputSamples++
}
sv.mu.Unlock()
@ -86,14 +84,13 @@ func (as *histogramBucketAggrState) removeOldEntries(ctx *flushCtx, currentTime
}
return true
})
ctx.a.staleOutputSamples["histogram_bucket"].Add(staleOutputSamples)
}
func (as *histogramBucketAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v any) bool {

View file

@ -105,16 +105,13 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
}
}
func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
suffix := "rate_sum"
if as.isAvg {
suffix = "rate_avg"
}
suffix := as.getSuffix()
as.removeOldEntries(ctx, suffix, currentTime)
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v any) bool {
@ -130,13 +127,16 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
sumRate += lv.increase / d
countSeries++
}
if resetState {
lv.prevTimestamp = lv.timestamp
lv.increase = 0
lvs[k1] = lv
}
}
deleted := sv.deleted
sv.mu.Unlock()
if countSeries == 0 {
if countSeries == 0 || deleted {
// Nothing to update
return true
}
@ -152,9 +152,15 @@ func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
})
}
func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentTime uint64) {
func (as *rateAggrState) getSuffix() string {
if as.isAvg {
return "rate_avg"
}
return "rate_sum"
}
func (as *rateAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
var staleOutputSamples, staleInputSamples int
m.Range(func(k, v any) bool {
sv := v.(*rateStateValue)
@ -162,7 +168,6 @@ func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentT
if currentTime > sv.deleteDeadline {
// Mark the current entry as deleted
sv.deleted = true
staleOutputSamples++
sv.mu.Unlock()
m.Delete(k)
return true
@ -173,12 +178,9 @@ func (as *rateAggrState) removeOldEntries(ctx *flushCtx, suffix string, currentT
for k1, lv := range lvs {
if currentTime > lv.deleteDeadline {
delete(lvs, k1)
staleInputSamples++
}
}
sv.mu.Unlock()
return true
})
ctx.a.staleInputSamples[suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[suffix].Add(staleOutputSamples)
}

View file

@ -47,9 +47,6 @@ var supportedOutputs = []string{
"unique_samples",
}
// maxLabelValueLen is maximum match expression label value length in stream aggregation metrics
const maxLabelValueLen = 64
var (
// lc contains information about all compressed labels for streaming aggregation
lc promutils.LabelsCompressor
@ -67,8 +64,10 @@ var (
//
// opts can contain additional options. If opts is nil, then default options are used.
//
// alias is used as url label in metrics exposed for the returned Aggregators.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) {
func LoadFromFile(path string, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) {
data, err := fscore.ReadFileOrHTTP(path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
@ -78,7 +77,7 @@ func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, e
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
}
as, err := LoadFromData(data, pushFunc, opts)
as, err := loadFromData(data, path, pushFunc, opts, alias)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
}
@ -136,16 +135,15 @@ type Options struct {
//
// This option can be overridden individually per each aggregation via ignore_first_intervals option.
IgnoreFirstIntervals int
// Alias is name or url of remote write context
Alias string
// aggrID is aggregators id number starting from 1, which is used in metrics labels
aggrID int
}
// Config is a configuration for a single stream aggregation.
type Config struct {
// Name is an optional name of the Config.
//
// It is used as `name` label in the exposed metrics for the given Config.
Name string `yaml:"name,omitempty"`
// Match is a label selector for filtering time series for the given selector.
//
// If the match isn't set, then all the input time series are processed.
@ -249,11 +247,26 @@ type Aggregators struct {
// It is used in Equal() for comparing Aggregators.
configData []byte
// filePath is the path to config file used for creating the Aggregators.
filePath string
// ms contains metrics associated with the Aggregators.
ms *metrics.Set
}
// FilePath returns path to file with the configuration used for creating the given Aggregators.
func (a *Aggregators) FilePath() string {
return a.filePath
}
// LoadFromData loads aggregators from data.
func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) {
//
// opts can contain additional options. If opts is nil, then default options are used.
func LoadFromData(data []byte, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) {
return loadFromData(data, "inmemory", pushFunc, opts, alias)
}
func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options, alias string) (*Aggregators, error) {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
@ -262,8 +275,7 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
for i, cfg := range cfgs {
opts.aggrID = i + 1
a, err := newAggregator(cfg, pushFunc, ms, opts)
a, err := newAggregator(cfg, filePath, pushFunc, ms, opts, alias, i+1)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
@ -278,30 +290,11 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
metricLabels := fmt.Sprintf("url=%q", opts.Alias)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := uint64(0)
for _, aggr := range as {
if aggr.da != nil {
n += aggr.da.sizeBytes()
}
}
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := uint64(0)
for _, aggr := range as {
if aggr.da != nil {
n += aggr.da.itemsCount()
}
}
return float64(n)
})
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
configData: configData,
filePath: filePath,
ms: ms,
}, nil
}
@ -380,11 +373,17 @@ type aggregator struct {
without []string
aggregateOnlyByTime bool
// interval is the interval between flushes
interval time.Duration
// dedupInterval is optional deduplication interval for incoming samples
dedupInterval time.Duration
// da is set to non-nil if input samples must be de-duplicated
da *dedupAggr
// aggrStates contains aggregate states for the given outputs
aggrStates map[string]aggrState
// aggrOutputs contains aggregate states for the given outputs
aggrOutputs []aggrOutput
// minTimestamp is used for ignoring old samples when ignoreOldSamples is set
minTimestamp atomic.Int64
@ -406,11 +405,14 @@ type aggregator struct {
flushTimeouts *metrics.Counter
dedupFlushTimeouts *metrics.Counter
ignoredOldSamples *metrics.Counter
ignoredNanSamples *metrics.Counter
IgnoredNaNSamples *metrics.Counter
matchedSamples *metrics.Counter
staleInputSamples map[string]*metrics.Counter
staleOutputSamples map[string]*metrics.Counter
flushedSamples map[string]*metrics.Counter
}
type aggrOutput struct {
as aggrState
outputSamples *metrics.Counter
}
type aggrState interface {
@ -419,6 +421,13 @@ type aggrState interface {
// samples[].key must be cloned by aggrState, since it may change after returning from pushSamples.
pushSamples(samples []pushSample)
// flushState must flush aggrState data to ctx.
//
// if resetState is true, then aggrState must be reset after flushing the data to ctx,
// otherwise the aggrState data must be kept unchanged.
//
// The resetState is set to false only in the benchmark, which measures flushState() performance
// over the same aggrState.
flushState(ctx *flushCtx, resetState bool)
}
@ -430,7 +439,7 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
// opts can contain additional options. If opts is nil, then default options are used.
//
// The returned aggregator must be stopped when no longer needed by calling MustStop().
func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options) (*aggregator, error) {
func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, opts *Options, alias string, aggrID int) (*aggregator, error) {
// check cfg.Interval
if cfg.Interval == "" {
return nil, fmt.Errorf("missing `interval` option")
@ -443,6 +452,10 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
return nil, fmt.Errorf("aggregation interval cannot be smaller than 1s; got %s", interval)
}
if opts == nil {
opts = &Options{}
}
// check cfg.DedupInterval
dedupInterval := opts.DedupInterval
if cfg.DedupInterval != "" {
@ -491,7 +504,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
by := sortAndRemoveDuplicates(cfg.By)
without := sortAndRemoveDuplicates(cfg.Without)
if len(by) > 0 && len(without) > 0 {
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously", by, without)
return nil, fmt.Errorf("`by: %s` and `without: %s` lists cannot be set simultaneously; see https://docs.victoriametrics.com/stream-aggregation/", by, without)
}
aggregateOnlyByTime := (len(by) == 0 && len(without) == 0)
if !aggregateOnlyByTime && len(without) == 0 {
@ -505,10 +518,12 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
}
if keepMetricNames {
if len(cfg.Outputs) != 1 {
return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q", cfg.Outputs)
return nil, fmt.Errorf("`outputs` list must contain only a single entry if `keep_metric_names` is set; got %q; "+
"see https://docs.victoriametrics.com/stream-aggregation/#output-metric-names", cfg.Outputs)
}
if cfg.Outputs[0] == "histogram_bucket" || strings.HasPrefix(cfg.Outputs[0], "quantiles(") && strings.Contains(cfg.Outputs[0], ",") {
return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series", cfg.Outputs)
return nil, fmt.Errorf("`keep_metric_names` cannot be applied to `outputs: %q`, since they can generate multiple time series; "+
"see https://docs.victoriametrics.com/stream-aggregation/#output-metric-names", cfg.Outputs)
}
}
@ -524,105 +539,42 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
ignoreFirstIntervals = *v
}
// initialize outputs list
// Initialize common metric labels
name := cfg.Name
if name == "" {
name = "none"
}
metricLabels := fmt.Sprintf(`name=%q,path=%q,url=%q,position="%d"`, name, path, alias, aggrID)
// initialize aggrOutputs
if len(cfg.Outputs) == 0 {
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s", supportedOutputs)
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
}
aggrStates := make(map[string]aggrState, len(cfg.Outputs))
for _, output := range cfg.Outputs {
// check for duplicated output
if _, ok := aggrStates[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated aggregation function: %s", output)
}
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for j, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
aggrOutputs := make([]aggrOutput, len(cfg.Outputs))
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
as, err := newAggrState(output, outputsSeen, stalenessInterval)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
return nil, err
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[j] = phi
}
if _, ok := aggrStates["quantiles"]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
}
aggrStates["quantiles"] = newQuantilesAggrState(phis)
continue
}
switch output {
case "avg":
aggrStates[output] = newAvgAggrState()
case "count_samples":
aggrStates[output] = newCountSamplesAggrState()
case "count_series":
aggrStates[output] = newCountSeriesAggrState()
case "histogram_bucket":
aggrStates[output] = newHistogramBucketAggrState(stalenessInterval)
case "increase":
aggrStates[output] = newTotalAggrState(stalenessInterval, true, true)
case "increase_prometheus":
aggrStates[output] = newTotalAggrState(stalenessInterval, true, false)
case "last":
aggrStates[output] = newLastAggrState()
case "max":
aggrStates[output] = newMaxAggrState()
case "min":
aggrStates[output] = newMinAggrState()
case "rate_avg":
aggrStates[output] = newRateAggrState(stalenessInterval, true)
case "rate_sum":
aggrStates[output] = newRateAggrState(stalenessInterval, false)
case "stddev":
aggrStates[output] = newStddevAggrState()
case "stdvar":
aggrStates[output] = newStdvarAggrState()
case "sum_samples":
aggrStates[output] = newSumSamplesAggrState()
case "total":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, true)
case "total_prometheus":
aggrStates[output] = newTotalAggrState(stalenessInterval, false, false)
case "unique_samples":
aggrStates[output] = newUniqueSamplesAggrState()
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s;", output, supportedOutputs)
aggrOutputs[i] = aggrOutput{
as: as,
outputSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_output_samples_total{output=%q,%s}`, output, metricLabels)),
}
}
// initialize suffix to add to metric names after aggregation
suffix := ":" + cfg.Interval
group := "none"
if labels := removeUnderscoreName(by); len(labels) > 0 {
group = fmt.Sprintf("by: %s", strings.Join(labels, ","))
suffix += fmt.Sprintf("_by_%s", strings.Join(labels, "_"))
}
if labels := removeUnderscoreName(without); len(labels) > 0 {
group = fmt.Sprintf("without: %s", strings.Join(labels, ","))
suffix += fmt.Sprintf("_without_%s", strings.Join(labels, "_"))
}
suffix += "_"
outputs := strings.Join(cfg.Outputs, ",")
matchExpr := cfg.Match.String()
if len(matchExpr) > maxLabelValueLen {
matchExpr = matchExpr[:maxLabelValueLen-3] + "..."
}
metricLabels := fmt.Sprintf(`match=%q, group=%q, url=%q, position="%d"`, matchExpr, group, opts.Alias, opts.aggrID)
// initialize the aggregator
a := &aggregator{
match: cfg.Match,
@ -638,36 +590,37 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
without: without,
aggregateOnlyByTime: aggregateOnlyByTime,
aggrStates: aggrStates,
interval: interval,
dedupInterval: dedupInterval,
aggrOutputs: aggrOutputs,
suffix: suffix,
stopCh: make(chan struct{}),
flushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
dedupFlushDuration: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{outputs=%q, %s}`, outputs, metricLabels)),
samplesLag: ms.GetOrCreateHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{outputs=%q, %s}`, outputs, metricLabels)),
flushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_flush_duration_seconds{%s}`, metricLabels)),
dedupFlushDuration: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_dedup_flush_duration_seconds{%s}`, metricLabels)),
samplesLag: ms.NewHistogram(fmt.Sprintf(`vm_streamaggr_samples_lag_seconds{%s}`, metricLabels)),
matchedSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{outputs=%q, %s}`, outputs, metricLabels)),
flushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)),
dedupFlushTimeouts: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{outputs=%q, %s}`, outputs, metricLabels)),
ignoredNanSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan", outputs=%q, %s}`, outputs, metricLabels)),
ignoredOldSamples: ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old", outputs=%q, %s}`, outputs, metricLabels)),
staleInputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)),
staleOutputSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)),
flushedSamples: make(map[string]*metrics.Counter, len(cfg.Outputs)),
}
for _, output := range cfg.Outputs {
// Removing output args for metric label value in outputs like quantile(arg1, arg2)
if ri := strings.IndexRune(output, '('); ri >= 0 {
output = output[:ri]
}
a.staleInputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="input", output=%q, %s}`, output, metricLabels))
a.staleOutputSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_stale_samples_total{key="output", output=%q, %s}`, output, metricLabels))
a.flushedSamples[output] = ms.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_flushed_samples_total{output=%q, %s}`, output, metricLabels))
matchedSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_matched_samples_total{%s}`, metricLabels)),
flushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_flush_timeouts_total{%s}`, metricLabels)),
dedupFlushTimeouts: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_dedup_flush_timeouts_total{%s}`, metricLabels)),
IgnoredNaNSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="nan",%s}`, metricLabels)),
ignoredOldSamples: ms.NewCounter(fmt.Sprintf(`vm_streamaggr_ignored_samples_total{reason="too_old",%s}`, metricLabels)),
}
if dedupInterval > 0 {
a.da = newDedupAggr()
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := a.da.sizeBytes()
return float64(n)
})
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := a.da.itemsCount()
return float64(n)
})
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
@ -682,14 +635,89 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
a.wg.Add(1)
go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, ignoreFirstIntervals)
a.wg.Done()
}()
return a, nil
}
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, interval, dedupInterval time.Duration, ignoreFirstIntervals int) {
func newAggrState(output string, outputsSeen map[string]struct{}, stalenessInterval time.Duration) (aggrState, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
}
outputsSeen[output] = struct{}{}
if strings.HasPrefix(output, "quantiles(") {
if !strings.HasSuffix(output, ")") {
return nil, fmt.Errorf("missing closing brace for `quantiles()` output")
}
argsStr := output[len("quantiles(") : len(output)-1]
if len(argsStr) == 0 {
return nil, fmt.Errorf("`quantiles()` must contain at least one phi")
}
args := strings.Split(argsStr, ",")
phis := make([]float64, len(args))
for i, arg := range args {
arg = strings.TrimSpace(arg)
phi, err := strconv.ParseFloat(arg, 64)
if err != nil {
return nil, fmt.Errorf("cannot parse phi=%q for quantiles(%s): %w", arg, argsStr, err)
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi inside quantiles(%s) must be in the range [0..1]; got %v", argsStr, phi)
}
phis[i] = phi
}
if _, ok := outputsSeen["quantiles"]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicated `quantiles()` function, please combine multiple phi* like `quantiles(0.5, 0.9)`")
}
outputsSeen["quantiles"] = struct{}{}
return newQuantilesAggrState(phis), nil
}
switch output {
case "avg":
return newAvgAggrState(), nil
case "count_samples":
return newCountSamplesAggrState(), nil
case "count_series":
return newCountSeriesAggrState(), nil
case "histogram_bucket":
return newHistogramBucketAggrState(stalenessInterval), nil
case "increase":
return newTotalAggrState(stalenessInterval, true, true), nil
case "increase_prometheus":
return newTotalAggrState(stalenessInterval, true, false), nil
case "last":
return newLastAggrState(), nil
case "max":
return newMaxAggrState(), nil
case "min":
return newMinAggrState(), nil
case "rate_avg":
return newRateAggrState(stalenessInterval, true), nil
case "rate_sum":
return newRateAggrState(stalenessInterval, false), nil
case "stddev":
return newStddevAggrState(), nil
case "stdvar":
return newStdvarAggrState(), nil
case "sum_samples":
return newSumSamplesAggrState(), nil
case "total":
return newTotalAggrState(stalenessInterval, false, true), nil
case "total_prometheus":
return newTotalAggrState(stalenessInterval, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrState(), nil
default:
return nil, fmt.Errorf("unsupported output=%q; supported values: %s; see https://docs.victoriametrics.com/stream-aggregation/", output, supportedOutputs)
}
}
func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipIncompleteFlush bool, ignoreFirstIntervals int) {
alignedSleep := func(d time.Duration) {
if !alignFlushToInterval {
return
@ -714,22 +742,22 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
if dedupInterval <= 0 {
alignedSleep(interval)
t := time.NewTicker(interval)
if a.dedupInterval <= 0 {
alignedSleep(a.interval)
t := time.NewTicker(a.interval)
defer t.Stop()
if alignFlushToInterval && skipIncompleteFlush {
a.flush(nil, interval, true)
a.flush(nil)
ignoreFirstIntervals--
}
for tickerWait(t) {
if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
a.flush(nil)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
a.flush(pushFunc)
}
if alignFlushToInterval {
@ -740,30 +768,30 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
}
} else {
alignedSleep(dedupInterval)
t := time.NewTicker(dedupInterval)
alignedSleep(a.dedupInterval)
t := time.NewTicker(a.dedupInterval)
defer t.Stop()
flushDeadline := time.Now().Add(interval)
flushDeadline := time.Now().Add(a.interval)
isSkippedFirstFlush := false
for tickerWait(t) {
a.dedupFlush(dedupInterval)
a.dedupFlush()
ct := time.Now()
if ct.After(flushDeadline) {
// It is time to flush the aggregated state
if alignFlushToInterval && skipIncompleteFlush && !isSkippedFirstFlush {
a.flush(nil, interval, true)
a.flush(nil)
ignoreFirstIntervals--
isSkippedFirstFlush = true
} else if ignoreFirstIntervals > 0 {
a.flush(nil, interval, true)
a.flush(nil)
ignoreFirstIntervals--
} else {
a.flush(pushFunc, interval, true)
a.flush(pushFunc)
}
for ct.After(flushDeadline) {
flushDeadline = flushDeadline.Add(interval)
flushDeadline = flushDeadline.Add(a.interval)
}
}
@ -777,13 +805,13 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipInc
}
if !skipIncompleteFlush && ignoreFirstIntervals <= 0 {
a.dedupFlush(dedupInterval)
a.flush(pushFunc, interval, true)
a.dedupFlush()
a.flush(pushFunc)
}
}
func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
if dedupInterval <= 0 {
func (a *aggregator) dedupFlush() {
if a.dedupInterval <= 0 {
// The de-duplication is disabled.
return
}
@ -794,15 +822,22 @@ func (a *aggregator) dedupFlush(dedupInterval time.Duration) {
d := time.Since(startTime)
a.dedupFlushDuration.Update(d.Seconds())
if d > dedupInterval {
if d > a.dedupInterval {
a.dedupFlushTimeouts.Inc()
logger.Warnf("deduplication couldn't be finished in the configured dedup_interval=%s; it took %.03fs; "+
"possible solutions: increase dedup_interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", dedupInterval, d.Seconds())
"reduce samples' ingestion rate to stream aggregation", a.dedupInterval, d.Seconds())
}
}
func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState bool) {
// flush flushes aggregator state to pushFunc.
//
// If pushFunc is nil, then the aggregator state is just reset.
func (a *aggregator) flush(pushFunc PushFunc) {
a.flushInternal(pushFunc, true)
}
func (a *aggregator) flushInternal(pushFunc PushFunc, resetState bool) {
startTime := time.Now()
// Update minTimestamp before flushing samples to the storage,
@ -811,31 +846,31 @@ func (a *aggregator) flush(pushFunc PushFunc, interval time.Duration, resetState
a.minTimestamp.Store(startTime.UnixMilli() - 5_000)
var wg sync.WaitGroup
for output, as := range a.aggrStates {
for i := range a.aggrOutputs {
ao := &a.aggrOutputs[i]
flushConcurrencyCh <- struct{}{}
wg.Add(1)
go func(as aggrState) {
go func(ao *aggrOutput) {
defer func() {
<-flushConcurrencyCh
wg.Done()
}()
ctx := getFlushCtx(a, pushFunc)
as.flushState(ctx, resetState)
ctx.flushSeries(output)
ctx.resetSeries()
ctx := getFlushCtx(a, ao, pushFunc)
ao.as.flushState(ctx, resetState)
ctx.flushSeries()
putFlushCtx(ctx)
}(as)
}(ao)
}
wg.Wait()
d := time.Since(startTime)
a.flushDuration.Update(d.Seconds())
if d > interval {
if d > a.interval {
a.flushTimeouts.Inc()
logger.Warnf("stream aggregation couldn't be finished in the configured interval=%s; it took %.03fs; "+
"possible solutions: increase interval; use match filter matching smaller number of series; "+
"reduce samples' ingestion rate to stream aggregation", interval, d.Seconds())
"reduce samples' ingestion rate to stream aggregation", a.interval, d.Seconds())
}
}
@ -851,7 +886,6 @@ func (a *aggregator) MustStop() {
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
now := time.Now().UnixMilli()
ctx := getPushCtx()
defer putPushCtx(ctx)
@ -864,7 +898,9 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
dropLabels := a.dropInputLabels
ignoreOldSamples := a.ignoreOldSamples
minTimestamp := a.minTimestamp.Load()
var maxLag int64
nowMsec := time.Now().UnixMilli()
var maxLagMsec int64
for idx, ts := range tss {
if !a.match.Match(ts.Labels) {
continue
@ -896,30 +932,31 @@ func (a *aggregator) Push(tss []prompbmarshal.TimeSeries, matchIdxs []byte) {
// key remains valid only by the end of this function and can't be reused after
// do not intern key because number of unique keys could be too high
key := bytesutil.ToUnsafeString(buf[bufLen:])
for _, sample := range ts.Samples {
if math.IsNaN(sample.Value) {
a.ignoredNanSamples.Inc()
for _, s := range ts.Samples {
if math.IsNaN(s.Value) {
a.IgnoredNaNSamples.Inc()
// Skip NaN values
continue
}
if ignoreOldSamples && sample.Timestamp < minTimestamp {
if ignoreOldSamples && s.Timestamp < minTimestamp {
a.ignoredOldSamples.Inc()
// Skip old samples outside the current aggregation interval
continue
}
if maxLag < now-sample.Timestamp {
maxLag = now - sample.Timestamp
lagMsec := nowMsec - s.Timestamp
if lagMsec > maxLagMsec {
maxLagMsec = lagMsec
}
samples = append(samples, pushSample{
key: key,
value: sample.Value,
timestamp: sample.Timestamp,
value: s.Value,
timestamp: s.Timestamp,
})
}
}
if len(samples) > 0 {
a.matchedSamples.Add(len(samples))
a.samplesLag.Update(float64(maxLag) / 1_000)
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
}
ctx.samples = samples
ctx.buf = buf
@ -969,8 +1006,8 @@ func getInputOutputKey(key string) (string, string) {
}
func (a *aggregator) pushSamples(samples []pushSample) {
for _, as := range a.aggrStates {
as.pushSamples(samples)
for _, ao := range a.aggrOutputs {
ao.as.pushSamples(samples)
}
}
@ -1036,13 +1073,14 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompbmarshal.Label, by,
return dstInput, dstOutput
}
func getFlushCtx(a *aggregator, pushFunc PushFunc) *flushCtx {
func getFlushCtx(a *aggregator, ao *aggrOutput, pushFunc PushFunc) *flushCtx {
v := flushCtxPool.Get()
if v == nil {
v = &flushCtx{}
}
ctx := v.(*flushCtx)
ctx.a = a
ctx.ao = ao
ctx.pushFunc = pushFunc
return ctx
}
@ -1056,6 +1094,7 @@ var flushCtxPool sync.Pool
type flushCtx struct {
a *aggregator
ao *aggrOutput
pushFunc PushFunc
tss []prompbmarshal.TimeSeries
@ -1065,6 +1104,7 @@ type flushCtx struct {
func (ctx *flushCtx) reset() {
ctx.a = nil
ctx.ao = nil
ctx.pushFunc = nil
ctx.resetSeries()
}
@ -1079,7 +1119,9 @@ func (ctx *flushCtx) resetSeries() {
ctx.samples = ctx.samples[:0]
}
func (ctx *flushCtx) flushSeries(aggrStateSuffix string) {
func (ctx *flushCtx) flushSeries() {
defer ctx.resetSeries()
tss := ctx.tss
if len(tss) == 0 {
// nothing to flush
@ -1091,7 +1133,7 @@ func (ctx *flushCtx) flushSeries(aggrStateSuffix string) {
// Fast path - push the output metrics.
if ctx.pushFunc != nil {
ctx.pushFunc(tss)
ctx.a.flushedSamples[aggrStateSuffix].Add(len(tss))
ctx.ao.outputSamples.Add(len(tss))
}
return
}
@ -1113,7 +1155,7 @@ func (ctx *flushCtx) flushSeries(aggrStateSuffix string) {
}
if ctx.pushFunc != nil {
ctx.pushFunc(dst)
ctx.a.flushedSamples[aggrStateSuffix].Add(len(dst))
ctx.ao.outputSamples.Add(len(dst))
}
auxLabels.Labels = dstLabels
promutils.PutLabels(auxLabels)
@ -1137,8 +1179,7 @@ func (ctx *flushCtx) appendSeries(key, suffix string, timestamp int64, value flo
// Limit the maximum length of ctx.tss in order to limit memory usage.
if len(ctx.tss) >= 10_000 {
ctx.flushSeries(suffix)
ctx.resetSeries()
ctx.flushSeries()
}
}
@ -1161,7 +1202,11 @@ func (ctx *flushCtx) appendSeriesWithExtraLabel(key, suffix string, timestamp in
Labels: ctx.labels[labelsLen:],
Samples: ctx.samples[samplesLen:],
})
ctx.a.flushedSamples[suffix].Add(len(ctx.tss))
// Limit the maximum length of ctx.tss in order to limit memory usage.
if len(ctx.tss) >= 10_000 {
ctx.flushSeries()
}
}
func addMetricSuffix(labels []prompbmarshal.Label, offset int, firstSuffix, lastSuffix string) []prompbmarshal.Label {

View file

@ -19,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := LoadFromData([]byte(config), pushFunc, Options{})
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err == nil {
t.Fatalf("expecting non-nil error")
}
@ -200,11 +200,11 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := LoadFromData([]byte(a), pushFunc, Options{})
aa, err := LoadFromData([]byte(a), pushFunc, nil, "some_alias")
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := LoadFromData([]byte(b), pushFunc, Options{})
ab, err := LoadFromData([]byte(b), pushFunc, nil, "some_alias")
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
@ -263,11 +263,11 @@ func TestAggregatorsSuccess(t *testing.T) {
tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock()
}
opts := Options{
opts := &Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
}
a, err := LoadFromData([]byte(config), pushFunc, opts)
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
@ -515,6 +515,7 @@ foo-1m-without-abc-sum-samples 12.5
without: [abc]
outputs: [count_samples, sum_samples, count_series]
match: '{non_existing_label!=""}'
name: foobar
`, `
foo{abc="123"} 4
bar 5
@ -527,6 +528,7 @@ foo{abc="456",de="fg"} 8
- interval: 1m
by: [abc]
outputs: [count_samples, sum_samples, count_series]
name: abcdef
match:
- foo{abc=~".+"}
- '{non_existing_label!=""}'
@ -980,11 +982,11 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
}
tssOutputLock.Unlock()
}
opts := Options{
opts := &Options{
DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
}
a, err := LoadFromData([]byte(config), pushFunc, opts)
a, err := LoadFromData([]byte(config), pushFunc, opts, "some_alias")
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}

View file

@ -39,7 +39,7 @@ func BenchmarkAggregatorsPush(b *testing.B) {
}
}
func BenchmarkAggregatorsFlushSerial(b *testing.B) {
func BenchmarkAggregatorsFlushInternalSerial(b *testing.B) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
@ -50,7 +50,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
b.SetBytes(int64(len(benchSeries) * len(benchOutputs)))
for i := 0; i < b.N; i++ {
for _, aggr := range a.as {
aggr.flush(pushFunc, time.Hour, false)
aggr.flushInternal(pushFunc, false)
}
}
}
@ -87,7 +87,7 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, Options{})
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}

View file

@ -13,8 +13,6 @@ import (
type totalAggrState struct {
m sync.Map
suffix string
// Whether to reset the output value on every flushState call.
resetTotalOnFlush bool
@ -50,15 +48,8 @@ type totalLastValueState struct {
func newTotalAggrState(stalenessInterval time.Duration, resetTotalOnFlush, keepFirstSample bool) *totalAggrState {
stalenessSecs := roundDurationToSecs(stalenessInterval)
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + stalenessSecs
suffix := "total"
if resetTotalOnFlush {
suffix = "increase"
}
if !keepFirstSample {
suffix += "_prometheus"
}
return &totalAggrState{
suffix: suffix,
resetTotalOnFlush: resetTotalOnFlush,
keepFirstSample: keepFirstSample,
stalenessSecs: stalenessSecs,
@ -128,11 +119,14 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000
as.removeOldEntries(ctx, currentTime)
suffix := as.getSuffix()
as.removeOldEntries(currentTime)
m := &as.m
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
sv.mu.Lock()
total := sv.total
if resetState {
@ -145,17 +139,31 @@ func (as *totalAggrState) flushState(ctx *flushCtx, resetState bool) {
}
deleted := sv.deleted
sv.mu.Unlock()
if !deleted {
key := k.(string)
ctx.appendSeries(key, as.suffix, currentTimeMsec, total)
ctx.appendSeries(key, suffix, currentTimeMsec, total)
}
return true
})
}
func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
func (as *totalAggrState) getSuffix() string {
// Note: this function is at hot path, so it shouldn't allocate.
if as.resetTotalOnFlush {
if as.keepFirstSample {
return "increase"
}
return "increase_prometheus"
}
if as.keepFirstSample {
return "total"
}
return "total_prometheus"
}
func (as *totalAggrState) removeOldEntries(currentTime uint64) {
m := &as.m
var staleInputSamples, staleOutputSamples int
m.Range(func(k, v any) bool {
sv := v.(*totalStateValue)
@ -163,7 +171,6 @@ func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
if currentTime > sv.deleteDeadline {
// Mark the current entry as deleted
sv.deleted = true
staleOutputSamples++
sv.mu.Unlock()
m.Delete(k)
return true
@ -174,12 +181,9 @@ func (as *totalAggrState) removeOldEntries(ctx *flushCtx, currentTime uint64) {
for k1, lv := range lvs {
if currentTime > lv.deleteDeadline {
delete(lvs, k1)
staleInputSamples++
}
}
sv.mu.Unlock()
return true
})
ctx.a.staleInputSamples[as.suffix].Add(staleInputSamples)
ctx.a.staleOutputSamples[as.suffix].Add(staleOutputSamples)
}