app/vmagent: add global aggregator (#6268)

Add global stream aggregation for VMAgent

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467
This commit is contained in:
Andrii Chubatiuk 2024-05-17 15:00:47 +03:00 committed by GitHub
parent b2765c45d0
commit f153f54d11
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 314 additions and 148 deletions

View file

@ -87,24 +87,6 @@ var (
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
@ -139,6 +121,12 @@ func MultitenancyEnabled() bool {
// Contains the current relabelConfigs.
var allRelabelConfigs atomic.Pointer[relabelConfigs]
// Contains the current global stream aggregators.
var sasGlobal atomic.Pointer[streamaggr.Aggregators]
// Contains the current global deduplicator.
var deduplicatorGlobal *streamaggr.Deduplicator
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16
@ -215,6 +203,17 @@ func Init() {
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
sasFile, sasOpts := getStreamAggrOpts(-1)
if sasFile != "" {
sas, err := newStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -streamAggr.config=%q: %s", sasFile, err)
}
sasGlobal.Store(sas)
} else if sasOpts.DedupInterval > 0 {
deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels)
}
if len(*remoteWriteURLs) > 0 {
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
@ -301,12 +300,6 @@ var (
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
)
func reloadStreamAggrConfigs() {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
@ -383,6 +376,10 @@ func Stop() {
close(configReloaderStopCh)
configReloaderWG.Wait()
sasGlobal.Load().MustStop()
deduplicatorGlobal.MustStop()
deduplicatorGlobal = nil
for _, rwctx := range rwctxs {
rwctx.MustStop()
}
@ -459,6 +456,8 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
sas := sasGlobal.Load()
for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce the maximum memory usage
samplesCount := 0
@ -493,6 +492,17 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
if sas != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tssBlock, matchIdxs.B)
if !*streamAggrGlobalKeepInput {
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
}
matchIdxsPool.Put(matchIdxs)
} else if deduplicatorGlobal != nil {
deduplicatorGlobal.Push(tssBlock)
tssBlock = tssBlock[:0]
}
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
return false
}
@ -500,6 +510,12 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
return true
}
func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) {
if tryPushBlockToRemoteStorages(tss, true) {
return
}
}
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
if len(tssBlock) == 0 {
// Nothing to push
@ -810,17 +826,9 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
}
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx)
sasFile, sasOpts := getStreamAggrOpts(argIdx)
if sasFile != "" {
opts := &streamaggr.Options{
DedupInterval: dedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: ignoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
sas, err := newStreamAggrConfig(argIdx, rwctx.pushInternalTrackDropped)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
}
@ -829,8 +837,8 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels)
} else if sasOpts.DedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, sasOpts.DedupInterval, sasOpts.DropInputLabels)
}
return rwctx
@ -987,40 +995,6 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo
return ok
}
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
if sasFile == "" {
// There is no stream aggregation for rwctx
return
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx),
}
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
sas := rwctx.sas.Load()
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssPool = &sync.Pool{
New: func() interface{} {
a := []prompbmarshal.TimeSeries{}
@ -1036,32 +1010,6 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
return rowsCount
}
// HasAnyStreamAggrConfigured checks if any streaming aggregation config provided
func HasAnyStreamAggrConfigured() bool {
return len(*streamAggrConfig) > 0
}
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
for idx, sasFile := range *streamAggrConfig {
if sasFile == "" {
continue
}
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
}
sas.MustStop()
}
return nil
}
func newMapFromStrings(a []string) map[string]struct{} {
m := make(map[string]struct{}, len(a))
for _, s := range a {

View file

@ -0,0 +1,160 @@
package remotewrite
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
// Global config
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDedupInterval = flagutil.NewDuration("streamAggr.dedupInterval", "0s", "Input samples are de-duplicated with this interval on "+
"aggregator before optional aggregation with -streamAggr.config . "+
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrGlobalIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+
"current aggregation interval for aggregator. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrGlobalIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for "+
"aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
// Per URL config
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config . "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
if _, err := newStreamAggrConfig(-1, pushNoop); err != nil {
return fmt.Errorf("could not load -streamAggr.config stream aggregation config: %w", err)
}
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*streamAggrConfig), len(*remoteWriteURLs))
}
for idx := range *streamAggrConfig {
if _, err := newStreamAggrConfig(idx, pushNoop); err != nil {
return err
}
}
return nil
}
// HasAnyStreamAggrConfigured checks if any streaming aggregation config provided
func HasAnyStreamAggrConfigured() bool {
return len(*streamAggrConfig) > 0 || *streamAggrGlobalConfig != ""
}
func reloadStreamAggrConfigs() {
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
for idx, rwctx := range rwctxs {
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
}
}
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
path, opts := getStreamAggrOpts(idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
return
}
var sas *streamaggr.Aggregators
if idx < 0 {
sas = sasGlobal.Load()
} else {
sas = rwctxs[idx].sas.Load()
}
if !sasNew.Equal(sas) {
var sasOld *streamaggr.Aggregators
if idx < 0 {
sasOld = sasGlobal.Swap(sasNew)
} else {
sasOld = rwctxs[idx].sas.Swap(sasNew)
}
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
} else {
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
}
func getStreamAggrOpts(idx int) (string, *streamaggr.Options) {
if idx < 0 {
return *streamAggrGlobalConfig, &streamaggr.Options{
DedupInterval: streamAggrGlobalDedupInterval.Duration(),
DropInputLabels: *streamAggrGlobalDropInputLabels,
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
}
}
opts := streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
if len(*streamAggrConfig) == 0 {
return "", &opts
}
return (*streamAggrConfig)[idx], &opts
}
func newStreamAggrConfigWithOpts(pushFunc streamaggr.PushFunc, path string, opts *streamaggr.Options) (*streamaggr.Aggregators, error) {
if len(path) == 0 {
// Skip empty stream aggregation config.
return nil, nil
}
return streamaggr.LoadFromFile(path, pushFunc, opts)
}
func newStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) {
path, opts := getStreamAggrOpts(idx)
return newStreamAggrConfigWithOpts(pushFunc, path, opts)
}

View file

@ -31,7 +31,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
## tip
**Update note 1: the `-remoteWrite.multitenantURL` command-line flag at `vmagent` was removed starting from this release. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy).**
**Update note 2: the `-streamAggr.dropInputLabels` command-line flag at `vmagent` was renamed to `-remoteWrite.streamAggr.dropInputLabels`. `-streamAggr.dropInputLabels` is now used for global streaming aggregation.**
* SECURITY: upgrade Go builder from Go1.22.2 to Go1.22.3. See [the list of issues addressed in Go1.22.3](https://github.com/golang/go/issues?q=milestone%3AGo1.22.3+label%3ACherryPickApproved).
@ -40,6 +40,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* FEATURE: [dashboards/single](https://grafana.com/grafana/dashboards/10229): add `Network Usage` panel to `Resource Usage` row.
* FEATURE: [dashboards/operator](https://grafana.com/grafana/dashboards/17869), [dashboards/backupmanager](https://grafana.com/grafana/dashboards/17798) and [dashboard/tenant-statistic](https://grafana.com/grafana/dashboards/16399): update dashboard to be compatible with Grafana 10+ version.
* FEATURE: [dashboards/cluster](https://grafana.com/grafana/dashboards/11176): add new panel `Concurrent selects` to `vmstorage` row. The panel will show how many ongoing select queries are processed by vmstorage and should help to identify resource bottlenecks. See panel description for more details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): support aggregation and deduplication configs before replicating data to configured `-remoteWrite.url` destinations. This saves CPU and memory resources when incoming data needs to be aggregated or deduplicated once and then replicated to multiple destinations. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5467).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add service discovery support for [Vultr](https://www.vultr.com/). See [these docs](https://docs.victoriametrics.com/sd_configs/#vultr_sd_configs) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6041).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): allow configuring `-remoteWrite.disableOnDiskQueue` and `-remoteWrite.dropSamplesOnOverload` cmd-line flags per each `-remoteWrite.url`. See this [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6065). Thanks to @rbizos for implementaion!
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add labels `path` and `url` to metrics `vmagent_remotewrite_push_failures_total` and `vmagent_remotewrite_samples_dropped_total`. Now number of failed pushes and dropped samples can be tracked per `-remoteWrite.url`.

View file

@ -19,15 +19,16 @@ The aggregation is applied to all the metrics received via any [supported data i
and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter)
after applying all the configured [relabeling stages](https://docs.victoriametrics.com/vmagent/#relabeling).
By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples).
_By default, stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyconcepts/#raw-samples).
It expects that the ingested samples have timestamps close to the current time. See [how to ignore old samples](#ignoring-old-samples)._
Stream aggregation can be configured via the following command-line flags:
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent/).
This flag can be specified individually per each `-remoteWrite.url`.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and at [vmagent](https://docs.victoriametrics.com/vmagent/).
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent/) only.
This flag can be specified individually per each `-remoteWrite.url` and aggregation will happen independently for each of them.
This allows writing different aggregates to different remote storage destinations.
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
These flags must point to a file containing [stream aggregation config](#stream-aggregation-config).
The file may contain `%{ENV_VAR}` placeholders which are substituted by the corresponding `ENV_VAR` environment variable values.
@ -39,29 +40,61 @@ By default, the following data is written to the storage when stream aggregation
This behaviour can be changed via the following command-line flags:
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent/) and `-streamAggr.keepInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
If one of these flags is set, then all the input samples are written to the storage alongside the aggregated samples.
The `-remoteWrite.streamAggr.keepInput` flag can be specified individually per each `-remoteWrite.url`.
- `-remoteWrite.streamAggr.dropInput` at [vmagent](https://docs.victoriametrics.com/vmagent/) and `-streamAggr.dropInput`
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/).
- `-streamAggr.dropInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/)
and [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
If one of these flags are set, then all the input samples are dropped, while only the aggregated samples are written to the storage.
The `-remoteWrite.streamAggr.dropInput` flag can be specified individually per each `-remoteWrite.url`.
## Routing
[Single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) supports relabeling,
deduplication and stream aggregation for all the received data, scraped or pushed.
The processed data is then stored in local storage and **can't be forwarded further**.
[vmagent](https://docs.victoriametrics.com/vmagent/) supports relabeling, deduplication and stream aggregation for all
the received data, scraped or pushed. Then, the collected data will be forwarded to specified `-remoteWrite.url` destinations.
The data processing order is the following:
1. All the received data is [relabeled](https://docs.victoriametrics.com/vmagent/#relabeling) according to
specified `-remoteWrite.relabelConfig`;
1. All the received data is [deduplicated](https://docs.victoriametrics.com/stream-aggregation/#deduplication)
according to specified `-streamAggr.dedupInterval`;
1. All the received data is aggregated according to specified `-streamAggr.config`;
1. The resulting data from p1 and p2 is then replicated to each `-remoteWrite.url`;
1. Data sent to each `-remoteWrite.url` can be additionally relabeled according to the
corresponding `-remoteWrite.urlRelabelConfig` (set individually per URL);
1. Data sent to each `-remoteWrite.url` can be additionally deduplicated according to the
corresponding `-remoteWrite.streamAggr.dedupInterval` (set individually per URL);
1. Data sent to each `-remoteWrite.url` can be additionally aggregated according to the
corresponding `-remoteWrite.streamAggr.config` (set individually per URL). Please note, it is not recommended
to use `-streamAggr.config` and `-remoteWrite.streamAggr.config` together, unless you understand the complications.
Typical scenarios for data routing with vmagent:
1. **Aggregate incoming data and replicate to N destinations**. For this one should configure `-streamAggr.config`
to aggregate the incoming data before replicating it to all the configured `-remoteWrite.url` destinations.
2. **Individually aggregate incoming data for each destination**. For this on should configure `-remoteWrite.streamAggr.config`
for each `-remoteWrite.url` destination. [Relabeling](https://docs.victoriametrics.com/vmagent/#relabeling)
via `-remoteWrite.urlRelabelConfig` can be used for routing only selected metrics to each `-remoteWrite.url` destination.
## Deduplication
[vmagent](https://docs.victoriametrics.com/vmagent/) supports online [de-duplication](https://docs.victoriametrics.com/#deduplication) of samples
before sending them to the configured `-remoteWrite.url`. The de-duplication can be enabled via the following options:
- By specifying the desired de-duplication interval via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url`.
- By specifying the desired de-duplication interval via `-streamAggr.dedupInterval` command-line flag for all received data
or via `-remoteWrite.streamAggr.dedupInterval` command-line flag for the particular `-remoteWrite.url` destination.
For example, `./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -remoteWrite.streamAggr.dedupInterval=30s` instructs `vmagent` to leave
only the last sample per each seen [time series](https://docs.victoriametrics.com/keyconcepts/#time-series) per every 30 seconds.
The de-duplication is performed after applying `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling).
The de-deduplication is performed after applying [relabeling](https://docs.victoriametrics.com/vmagent/#relabeling) and
before performing the aggregation.
If the `-remoteWrite.streamAggr.config` and / or `-streamAggr.config` is set, then the de-duplication is performed individually per each
[stream aggregation config](#stream-aggregation-config) for the matching samples after applying [input_relabel_configs](#relabeling).
If the `-remoteWrite.streamAggr.config` is set, then the de-duplication is performed individually per each [stream aggregation config](#stream-aggregation-config)
for the matching samples after applying [input_relabel_configs](#relabeling).
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config) at `-remoteWrite.streamAggr.config`.
- By specifying `dedup_interval` option individually per each [stream aggregation config](#stream-aggregation-config)
in `-remoteWrite.streamAggr.config` or `-streamAggr.config` configs.
[Single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) supports two types of de-duplication:
- After storing the duplicate samples to local storage. See [`-dedup.minScrapeInterval`](https://docs.victoriametrics.com/#deduplication) command-line option.
@ -82,11 +115,12 @@ The online de-duplication uses the same logic as [`-dedup.minScrapeInterval` com
## Ignoring old samples
By default, all the input samples are taken into account during stream aggregation. If samples with old timestamps outside the current [aggregation interval](#stream-aggregation-config)
must be ignored, then the following options can be used:
By default, all the input samples are taken into account during stream aggregation. If samples with old timestamps
outside the current [aggregation interval](#stream-aggregation-config) must be ignored, then the following options can be used:
- To pass `-remoteWrite.streamAggr.ignoreOldSamples` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/).
- To pass `-streamAggr.ignoreOldSamples` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
or to [vmagent](https://docs.victoriametrics.com/vmagent/). At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.ignoreOldSamples` flag can be specified individually per each `-remoteWrite.url`.
This enables ignoring old samples for all the [aggregation configs](#stream-aggregation-config).
- To set `ignore_old_samples: true` option at the particular [aggregation config](#stream-aggregation-config).
@ -99,11 +133,12 @@ received from clients that maintain a queue of unsent data, such as Prometheus o
cleared within the aggregation `interval`, only a portion of the time series may be processed, leading to distorted
calculations. To mitigate this, consider the following options:
- Set `-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent/)
or `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. It is expected that all incomplete or queued data will be processed during
specified `<intervalsCount>` and all subsequent aggregation intervals will produce correct data.
- Set `-streamAggr.ignoreFirstIntervals=<intervalsCount>` command-line flag to [single-node VictoriaMetrics](https://docs.victoriametrics.com/)
or to [vmagent](https://docs.victoriametrics.com/vmagent/) to skip first `<intervalsCount>` [aggregation intervals](#stream-aggregation-config)
from persisting to the storage. At [vmagent](https://docs.victoriametrics.com/vmagent/)
`-remoteWrite.streamAggr.ignoreFirstIntervals=<intervalsCount>` flag can be specified individually per each `-remoteWrite.url`.
It is expected that all incomplete or queued data will be processed during specified `<intervalsCount>`
and all subsequent aggregation intervals will produce correct data.
- Set `ignore_first_intervals: <intervalsCount>` option individually per [aggregation config](#stream-aggregation-config).
This enables ignoring first `<intervalsCount>` aggregation intervals for that particular aggregation config.
@ -480,7 +515,8 @@ See also [dropping unneded labels](#dropping-unneeded-labels).
If you need dropping some labels from input samples before [input relabeling](#relabeling), [de-duplication](#deduplication)
and [stream aggregation](#aggregation-outputs), then the following options exist:
- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag.
- To specify comma-separated list of label names to drop in `-streamAggr.dropInputLabels` command-line flag
or via `-remoteWrite.streamAggr.dropInputLabels` individually per each `-remoteWrite.url`.
For example, `-streamAggr.dropInputLabels=replica,az` instructs to drop `replica` and `az` labels from input samples
before applying de-duplication and stream aggregation.
@ -879,9 +915,10 @@ See also [aggregation outputs](#aggregation-outputs).
## Stream aggregation config
Below is the format for stream aggregation config file, which may be referred via `-remoteWrite.streamAggr.config` command-line flag
at [vmagent](https://docs.victoriametrics.com/vmagent/) or via `-streamAggr.config` command-line flag
at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/):
Below is the format for stream aggregation config file, which may be referred via `-streamAggr.config` command-line flag at
[single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-victoriametrics/) and [vmagent](https://docs.victoriametrics.com/vmagent/).
At [vmagent](https://docs.victoriametrics.com/vmagent/) `-remoteWrite.streamAggr.config` command-line flag can be
specified individually per each `-remoteWrite.url`:
```yaml
# match is an optional filter for incoming samples to aggregate.
@ -957,13 +994,13 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/single-server-
# ignore_old_samples instructs ignoring input samples with old timestamps outside the current aggregation interval.
# See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
# See also -streamAggr.ignoreOldSamples command-line flag.
# See also -remoteWrite.streamAggr.ignoreOldSamples or -streamAggr.ignoreOldSamples command-line flag.
#
# ignore_old_samples: false
# ignore_first_intervals instructs ignoring first N aggregation intervals after process start.
# See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals
# See also -remoteWrite.streamAggr.ignoreFirstIntervals or -streamAggr.ignoreFirstIntervals command-line flag.
#
# ignore_first_intervals: false
@ -1019,6 +1056,8 @@ support the following approaches for hot reloading stream aggregation configs fr
The following outputs track the last seen per-series values in order to properly calculate output values:
- [rate_sum](#rate_sum)
- [rate_avg](#rate_avg)
- [total](#total)
- [total_prometheus](#total_prometheus)
- [increase](#increase)

View file

@ -121,7 +121,8 @@ additionally to pull-based Prometheus-compatible targets' scraping:
`vmagent` should be restarted in order to update config options set via command-line args.
`vmagent` supports multiple approaches for reloading configs from updated config files such as
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig`, `-streamAggr.config`
and `-remoteWrite.streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process:
@ -1972,6 +1973,8 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs/#nomad_sd_configs for details (default 30s)
-promscrape.openstackSDCheckInterval duration
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs/#openstack_sd_configs for details (default 30s)
-promscrape.scrapeExemplars
Whether to enable scraping of exemplars from scrape targets.
-promscrape.seriesLimitPerTarget int
Optional limit on the number of unique time series a single scrape target can expose. See https://docs.victoriametrics.com/vmagent/#cardinality-limiter for more info
-promscrape.streamParse
@ -2173,6 +2176,10 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/
Supports array of values separated by comma or specified via multiple flags.
Empty values are set to false.
-remoteWrite.streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-remoteWrite.streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-remoteWrite.streamAggr.ignoreOldSamples array
@ -2227,14 +2234,23 @@ See the docs at https://docs.victoriametrics.com/vmagent/ .
TCP and UDP address to listen for Statsd plaintext data. Usually :8125 must be set. Doesn't work if empty. See also -statsdListenAddr.useProxyProtocol
-statsdListenAddr.useProxyProtocol
Whether to use proxy protocol for connections accepted at -statsdListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.dedupInterval value
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication
The following optional suffixes are supported: s (second), m (minute), h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months (default 0s)
-streamAggr.dropInput
Whether to drop all the input samples after the aggregation with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.dropInputLabels array
An optional list of labels to drop from samples before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
An optional list of labels to drop from samples for aggregator before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-streamAggr.ignoreFirstIntervals int
Number of aggregation intervals to skip after the start for aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start
-streamAggr.ignoreOldSamples
Whether to ignore input samples with old timestamps outside the current aggregation interval for aggregator. See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples
-streamAggr.keepInput
Whether to keep all the input samples after the aggregation with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/
-tls array
Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls
Supports array of values separated by comma or specified via multiple flags.

View file

@ -27,6 +27,8 @@ import (
)
var supportedOutputs = []string{
"rate_sum",
"rate_avg",
"total",
"total_prometheus",
"increase",