From d655d6b047099758f43a7e4c80c8265d62df3933 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Jan 2023 09:14:49 -0800 Subject: [PATCH 1/2] lib/streamaggr: add ability to de-duplicate input samples before aggregation --- app/vmagent/remotewrite/remotewrite.go | 7 +- app/vminsert/common/streamaggr.go | 6 +- docs/CHANGELOG.md | 1 + docs/stream-aggregation.md | 14 ++- lib/streamaggr/streamaggr.go | 103 ++++++++++++++++++++--- lib/streamaggr/streamaggr_test.go | 82 +++++++++++++++++- lib/streamaggr/streamaggr_timing_test.go | 2 +- 7 files changed, 195 insertions(+), 20 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 407fd6d13..b92fc904c 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -62,10 +62,12 @@ var ( streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput") + "See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval") streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+ "By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) var ( @@ -509,7 +511,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI // Initialize sas sasFile := streamAggrConfig.GetOptionalArg(argIdx) if sasFile != "" { - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal) + dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) + sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) if err != nil { logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) } diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index ed028e089..066a9cacb 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -16,10 +16,12 @@ import ( var ( streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation.html . "+ - "See also -remoteWrite.streamAggr.keepInput") + "See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval") streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep input samples after the aggregation with -streamAggr.config. "+ "By default the input is dropped after the aggregation, so only the aggregate data is stored. "+ "See https://docs.victoriametrics.com/stream-aggregation.html") + streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+ + "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") ) // InitStreamAggr must be called after flag.Parse and before using the common package. @@ -30,7 +32,7 @@ func InitStreamAggr() { // Nothing to initialize return } - a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries) + a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval) if err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9fb15ec4a..a55b72280 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): add the ability to [de-duplicate](https://docs.victoriametrics.com/#deduplication) input samples before aggregation via `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line options. * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add dark mode - it can be seleted via `settings` menu in the top right corner. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3704). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696). diff --git a/docs/stream-aggregation.md b/docs/stream-aggregation.md index 11a0327ab..863010e2b 100644 --- a/docs/stream-aggregation.md +++ b/docs/stream-aggregation.md @@ -12,7 +12,7 @@ and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics The stream aggregation is configured via the following command-line flags: - `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each specified `-remoteWrite.url`. + This flag can be specified individually per each `-remoteWrite.url`. This allows writing different aggregates to different remote storage destinations. - `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). @@ -22,13 +22,23 @@ By default only the aggregated data is written to the storage. If the original i then the following command-line flags must be specified: - `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html). - This flag can be specified individually per each specified `-remoteWrite.url`. + This flag can be specified individually per each `-remoteWrite.url`. This allows writing both raw and aggregate data to different remote storage destinations. - `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples). It expects that the ingested samples have timestamps close to the current time. +By default all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation. +For example, if the samples are received from replicated sources. +The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication) +before aggregation in this case: + +- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html). + This flag can be specified individually per each `-remoteWrite.url`. + This allows setting different de-duplication intervals per each configured remote storage. +- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html). + ## Use cases Stream aggregation can be used in the following cases: diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 5fcf4be7e..a31dd0e16 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -38,13 +38,16 @@ var supportedOutputs = []string{ // LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) { +func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { data, err := fs.ReadFileOrHTTP(path) if err != nil { return nil, fmt.Errorf("cannot load aggregators: %w", err) } - as, err := NewAggregatorsFromData(data, pushFunc) + as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval) if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err) } @@ -53,13 +56,16 @@ func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) { // NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned Aggregators must be stopped with MustStop() when no longer needed. -func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) { +func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { return nil, err } - return NewAggregators(cfgs, pushFunc) + return NewAggregators(cfgs, pushFunc, dedupInterval) } // Config is a configuration for a single stream aggregation. @@ -130,14 +136,17 @@ type Aggregators struct { // // pushFunc is called when the aggregated data must be flushed. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // MustStop must be called on the returned Aggregators when they are no longer needed. -func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) { +func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { if len(cfgs) == 0 { return nil, nil } as := make([]*aggregator, len(cfgs)) for i, cfg := range cfgs { - a, err := newAggregator(cfg, pushFunc) + a, err := newAggregator(cfg, pushFunc, dedupInterval) if err != nil { return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } @@ -179,6 +188,10 @@ type aggregator struct { without []string aggregateOnlyByTime bool + // dedupAggr is set to non-nil if input samples must be de-duplicated according + // to the dedupInterval passed to newAggregator(). + dedupAggr *lastAggrState + // aggrStates contains aggregate states for the given outputs aggrStates []aggrState @@ -205,8 +218,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries) // newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc. // +// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated, +// e.g. only the last sample per each time series per each dedupInterval is aggregated. +// // The returned aggregator must be stopped when no longer needed by calling MustStop(). -func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { +func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) { // check cfg.Interval interval, err := time.ParseDuration(cfg.Interval) if err != nil { @@ -309,6 +325,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { } suffix += "_" + var dedupAggr *lastAggrState + if dedupInterval > 0 { + dedupAggr = newLastAggrState() + } + // initialize the aggregator a := &aggregator{ match: cfg.Match, @@ -320,6 +341,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { without: without, aggregateOnlyByTime: aggregateOnlyByTime, + dedupAggr: dedupAggr, aggrStates: aggrStates, pushFunc: pushFunc, @@ -328,15 +350,41 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) { stopCh: make(chan struct{}), } + if dedupAggr != nil { + a.wg.Add(1) + go func() { + a.runDedupFlusher(dedupInterval) + a.wg.Done() + }() + } a.wg.Add(1) go func() { a.runFlusher(interval) - defer a.wg.Done() + a.wg.Done() }() return a, nil } +func (a *aggregator) runDedupFlusher(interval time.Duration) { + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-a.stopCh: + return + case <-t.C: + } + + // Globally limit the concurrency for metrics' flush + // in order to limit memory usage when big number of aggregators + // are flushed at the same time. + flushConcurrencyCh <- struct{}{} + a.dedupFlush() + <-flushConcurrencyCh + } +} + func (a *aggregator) runFlusher(interval time.Duration) { t := time.NewTicker(interval) defer t.Stop() @@ -358,6 +406,15 @@ func (a *aggregator) runFlusher(interval time.Duration) { var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +func (a *aggregator) dedupFlush() { + ctx := &flushCtx{ + skipAggrSuffix: true, + } + a.dedupAggr.appendSeriesForFlush(ctx) + logger.Errorf("series after dedup: %v", ctx.tss) + a.push(ctx.tss) +} + func (a *aggregator) flush() { ctx := &flushCtx{ suffix: a.suffix, @@ -395,8 +452,29 @@ func (a *aggregator) MustStop() { a.wg.Wait() } -// Push pushes series to a. +// Push pushes tss to a. func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) { + if a.dedupAggr == nil { + a.push(tss) + return + } + + // deduplication is enabled. + // push samples to dedupAggr, so later they will be pushed to the configured aggregators. + pushSample := a.dedupAggr.pushSample + inputKey := "" + bb := bbPool.Get() + for _, ts := range tss { + bb.B = marshalLabelsFast(bb.B[:0], ts.Labels) + outputKey := bytesutil.InternBytes(bb.B) + for _, sample := range ts.Samples { + pushSample(inputKey, outputKey, sample.Value) + } + } + bbPool.Put(bb) +} + +func (a *aggregator) push(tss []prompbmarshal.TimeSeries) { labels := promutils.GetLabels() tmpLabels := promutils.GetLabels() bb := bbPool.Get() @@ -545,7 +623,8 @@ func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal } type flushCtx struct { - suffix string + skipAggrSuffix bool + suffix string tss []prompbmarshal.TimeSeries labels []prompbmarshal.Label @@ -567,7 +646,9 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6 if err != nil { logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err) } - ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + if !ctx.skipAggrSuffix { + ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix) + } ctx.samples = append(ctx.samples, prompbmarshal.Sample{ Timestamp: timestamp, Value: value, diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index 672032e45..a3c002c8d 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -6,6 +6,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -18,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err == nil { t.Fatalf("expecting non-nil error") } @@ -136,7 +137,7 @@ func TestAggregatorsSuccess(t *testing.T) { } tssOutputLock.Unlock() } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } @@ -641,6 +642,83 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 `) } +func TestAggregatorsWithDedupInterval(t *testing.T) { + f := func(config, inputMetrics, outputMetricsExpected string) { + t.Helper() + + // Initialize Aggregators + var tssOutput []prompbmarshal.TimeSeries + var tssOutputLock sync.Mutex + pushFunc := func(tss []prompbmarshal.TimeSeries) { + tssOutputLock.Lock() + for _, ts := range tss { + labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...) + samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...) + tssOutput = append(tssOutput, prompbmarshal.TimeSeries{ + Labels: labelsCopy, + Samples: samplesCopy, + }) + } + tssOutputLock.Unlock() + } + const dedupInterval = time.Hour + a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval) + if err != nil { + t.Fatalf("cannot initialize aggregators: %s", err) + } + + // Push the inputMetrics to Aggregators + tssInput := mustParsePromMetrics(inputMetrics) + a.Push(tssInput) + if a != nil { + for _, aggr := range a.as { + aggr.dedupFlush() + aggr.flush() + } + } + a.MustStop() + + // Verify the tssOutput contains the expected metrics + tsStrings := make([]string, len(tssOutput)) + for i, ts := range tssOutput { + tsStrings[i] = timeSeriesToString(ts) + } + sort.Strings(tsStrings) + outputMetrics := strings.Join(tsStrings, "") + if outputMetrics != outputMetricsExpected { + t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected) + } + } + + f(` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar 567 +`, `bar:1m_sum_samples 567 +foo:1m_sum_samples 123 +`) + + f(` +- interval: 1m + outputs: [sum_samples] +`, ` +foo 123 +bar{baz="qwe"} 1.32 +bar{baz="qwe"} 4.34 +bar{baz="qwe"} 2 +foo{baz="qwe"} -5 +bar{baz="qwer"} 343 +bar{baz="qwer"} 344 +foo{baz="qwe"} 10 +`, `bar:1m_sum_samples{baz="qwe"} 2 +bar:1m_sum_samples{baz="qwer"} 344 +foo:1m_sum_samples 123 +foo:1m_sum_samples{baz="qwe"} 10 +`) +} + func timeSeriesToString(ts prompbmarshal.TimeSeries) string { labelsString := promrelabel.LabelsToString(ts.Labels) if len(ts.Samples) != 1 { diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 6ff151241..f45dd0b40 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -40,7 +40,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(tss []prompbmarshal.TimeSeries) { panic(fmt.Errorf("unexpected pushFunc call")) } - a, err := NewAggregatorsFromData([]byte(config), pushFunc) + a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0) if err != nil { b.Fatalf("unexpected error when initializing aggregators: %s", err) } From 28f66f0079fd24ba256f976a4f4c7e16690b8427 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 25 Jan 2023 09:20:05 -0800 Subject: [PATCH 2/2] docs: update the list of command-line flags according to the latest changes --- README.md | 14 +++++++++----- app/vmagent/README.md | 13 +++++++++---- app/vmalert/README.md | 6 ++++++ docs/README.md | 14 +++++++++----- docs/Single-server-VictoriaMetrics.md | 14 +++++++++----- docs/vmagent.md | 13 +++++++++---- docs/vmalert.md | 6 ++++++ 7 files changed, 57 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 37dd7628c..e51a7f5c8 100644 --- a/README.md +++ b/README.md @@ -2217,7 +2217,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -inmemoryDataFlushInterval duration The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -logNewSeries Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics -loggerDisableTimestamps @@ -2334,12 +2336,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -promscrape.minResponseSizeForStreamParse size The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000) + -promscrape.noStaleMarkers + Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.nomad.waitTime duration Wait time used by Nomad service discovery. Default value is used if not set -promscrape.nomadSDCheckInterval duration Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s) - -promscrape.noStaleMarkers - Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.openstackSDCheckInterval duration Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s) -promscrape.seriesLimitPerTarget int @@ -2485,9 +2487,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + -streamAggr.dedupInterval duration + Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/app/vmagent/README.md b/app/vmagent/README.md index f8c33ec5e..a80daa547 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -1205,7 +1205,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -influxTrimTimestamp duration Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -kafka.consumer.topic array Kafka topic names for data consumption. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. @@ -1340,12 +1342,12 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.minResponseSizeForStreamParse size The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000) + -promscrape.noStaleMarkers + Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.nomad.waitTime duration Wait time used by Nomad service discovery. Default value is used if not set -promscrape.nomadSDCheckInterval duration Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s) - -promscrape.noStaleMarkers - Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.openstackSDCheckInterval duration Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s) -promscrape.seriesLimitPerTarget int @@ -1468,8 +1470,11 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.config array - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval Supports an array of values separated by comma or specified via multiple flags. + -remoteWrite.streamAggr.dedupInterval array + Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.keepInput array Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. diff --git a/app/vmalert/README.md b/app/vmalert/README.md index babe8ffc7..c653abcbd 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -898,6 +898,10 @@ The shortlist of configuration flags is the following: Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8880") + -insert.maxQueueDuration duration + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -loggerDisableTimestamps Whether to disable writing timestamps in logs -loggerErrorsPerSecondLimit int @@ -914,6 +918,8 @@ The shortlist of configuration flags is the following: Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC") -loggerWarnsPerSecondLimit int Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit + -maxConcurrentInserts int + The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 8) -memory.allowedBytes size Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0) diff --git a/docs/README.md b/docs/README.md index 94a8fd960..f488b5992 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2218,7 +2218,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -inmemoryDataFlushInterval duration The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -logNewSeries Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics -loggerDisableTimestamps @@ -2335,12 +2337,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -promscrape.minResponseSizeForStreamParse size The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000) + -promscrape.noStaleMarkers + Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.nomad.waitTime duration Wait time used by Nomad service discovery. Default value is used if not set -promscrape.nomadSDCheckInterval duration Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s) - -promscrape.noStaleMarkers - Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.openstackSDCheckInterval duration Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s) -promscrape.seriesLimitPerTarget int @@ -2486,9 +2488,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + -streamAggr.dedupInterval duration + Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index b47890b72..5da7b5475 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -2221,7 +2221,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -inmemoryDataFlushInterval duration The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -logNewSeries Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics -loggerDisableTimestamps @@ -2338,12 +2340,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -promscrape.minResponseSizeForStreamParse size The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000) + -promscrape.noStaleMarkers + Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.nomad.waitTime duration Wait time used by Nomad service discovery. Default value is used if not set -promscrape.nomadSDCheckInterval duration Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s) - -promscrape.noStaleMarkers - Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.openstackSDCheckInterval duration Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s) -promscrape.seriesLimitPerTarget int @@ -2489,9 +2491,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -storageDataPath string Path to storage data (default "victoria-metrics-data") -streamAggr.config string - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval + -streamAggr.dedupInterval duration + Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero -streamAggr.keepInput - Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html + Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html -tls Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set -tlsCertFile string diff --git a/docs/vmagent.md b/docs/vmagent.md index b55541c56..cfaac51ad 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -1209,7 +1209,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -influxTrimTimestamp duration Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms) -insert.maxQueueDuration duration - The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s) + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -kafka.consumer.topic array Kafka topic names for data consumption. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Supports an array of values separated by comma or specified via multiple flags. @@ -1344,12 +1346,12 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . -promscrape.minResponseSizeForStreamParse size The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000) + -promscrape.noStaleMarkers + Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.nomad.waitTime duration Wait time used by Nomad service discovery. Default value is used if not set -promscrape.nomadSDCheckInterval duration Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s) - -promscrape.noStaleMarkers - Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -promscrape.openstackSDCheckInterval duration Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s) -promscrape.seriesLimitPerTarget int @@ -1472,8 +1474,11 @@ See the docs at https://docs.victoriametrics.com/vmagent.html . The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.config array - Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput + Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval Supports an array of values separated by comma or specified via multiple flags. + -remoteWrite.streamAggr.dedupInterval array + Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero + Supports array of values separated by comma or specified via multiple flags. -remoteWrite.streamAggr.keepInput array Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html Supports array of values separated by comma or specified via multiple flags. diff --git a/docs/vmalert.md b/docs/vmalert.md index e90efa3d9..0e9ffaaf4 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -902,6 +902,10 @@ The shortlist of configuration flags is the following: Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8880") + -insert.maxQueueDuration duration + The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s) + -internStringMaxLen int + The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300) -loggerDisableTimestamps Whether to disable writing timestamps in logs -loggerErrorsPerSecondLimit int @@ -918,6 +922,8 @@ The shortlist of configuration flags is the following: Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC") -loggerWarnsPerSecondLimit int Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit + -maxConcurrentInserts int + The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 8) -memory.allowedBytes size Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)