mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files
This commit is contained in:
commit
274627943e
14 changed files with 252 additions and 43 deletions
14
README.md
14
README.md
|
@ -2217,7 +2217,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-inmemoryDataFlushInterval duration
|
||||
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-logNewSeries
|
||||
Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics
|
||||
-loggerDisableTimestamps
|
||||
|
@ -2334,12 +2336,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s)
|
||||
-promscrape.seriesLimitPerTarget int
|
||||
|
@ -2485,9 +2487,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-storageDataPath string
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
|
||||
-streamAggr.keepInput
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls
|
||||
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
|
||||
-tlsCertFile string
|
||||
|
|
|
@ -1205,7 +1205,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-influxTrimTimestamp duration
|
||||
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-kafka.consumer.topic array
|
||||
Kafka topic names for data consumption. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -1340,12 +1342,12 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s)
|
||||
-promscrape.seriesLimitPerTarget int
|
||||
|
@ -1468,8 +1470,11 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.config array
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.dedupInterval array
|
||||
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.keepInput array
|
||||
Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
|
|
|
@ -62,10 +62,12 @@ var (
|
|||
|
||||
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput")
|
||||
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -509,7 +511,8 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
|||
// Initialize sas
|
||||
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
|
||||
if sasFile != "" {
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal)
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
|
||||
}
|
||||
|
|
|
@ -898,6 +898,10 @@ The shortlist of configuration flags is the following:
|
|||
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
|
||||
-httpListenAddr string
|
||||
Address to listen for http connections (default ":8880")
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-loggerDisableTimestamps
|
||||
Whether to disable writing timestamps in logs
|
||||
-loggerErrorsPerSecondLimit int
|
||||
|
@ -914,6 +918,8 @@ The shortlist of configuration flags is the following:
|
|||
Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC")
|
||||
-loggerWarnsPerSecondLimit int
|
||||
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
|
||||
-maxConcurrentInserts int
|
||||
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 8)
|
||||
-memory.allowedBytes size
|
||||
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
|
||||
|
|
|
@ -16,10 +16,12 @@ import (
|
|||
var (
|
||||
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput")
|
||||
"See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep input samples after the aggregation with -streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is stored. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flag.Duration("streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
// InitStreamAggr must be called after flag.Parse and before using the common package.
|
||||
|
@ -30,7 +32,7 @@ func InitStreamAggr() {
|
|||
// Nothing to initialize
|
||||
return
|
||||
}
|
||||
a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries)
|
||||
a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html): add the ability to [de-duplicate](https://docs.victoriametrics.com/#deduplication) input samples before aggregation via `-streamAggr.dedupInterval` and `-remoteWrite.streamAggr.dedupInterval` command-line options.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add dark mode - it can be seleted via `settings` menu in the top right corner. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3704).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): improve visual appearance of the top menu. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3678).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): embed fonts into binary instead of loading them from external sources. This allows using `vmui` in full from isolated networks without access to Internet. Thanks to @ScottKevill for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3696).
|
||||
|
|
|
@ -2218,7 +2218,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-inmemoryDataFlushInterval duration
|
||||
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-logNewSeries
|
||||
Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics
|
||||
-loggerDisableTimestamps
|
||||
|
@ -2335,12 +2337,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s)
|
||||
-promscrape.seriesLimitPerTarget int
|
||||
|
@ -2486,9 +2488,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-storageDataPath string
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
|
||||
-streamAggr.keepInput
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls
|
||||
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
|
||||
-tlsCertFile string
|
||||
|
|
|
@ -2221,7 +2221,9 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-inmemoryDataFlushInterval duration
|
||||
The interval for guaranteed saving of in-memory data to disk. The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). Smaller intervals increase disk IO load. Minimum supported value is 1s (default 5s)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-logNewSeries
|
||||
Whether to log new series. This option is for debug purposes only. It can lead to performance issues when big number of new series are ingested into VictoriaMetrics
|
||||
-loggerDisableTimestamps
|
||||
|
@ -2338,12 +2340,12 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s)
|
||||
-promscrape.seriesLimitPerTarget int
|
||||
|
@ -2489,9 +2491,11 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
-storageDataPath string
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html .See also -remoteWrite.streamAggr.keepInput
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -streamAggr.dedupInterval
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
|
||||
-streamAggr.keepInput
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config .By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Whether to keep input samples after the aggregation with -streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is stored. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
-tls
|
||||
Whether to enable TLS for incoming HTTP requests at -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set
|
||||
-tlsCertFile string
|
||||
|
|
|
@ -12,7 +12,7 @@ and/or scraped from [Prometheus-compatible targets](https://docs.victoriametrics
|
|||
The stream aggregation is configured via the following command-line flags:
|
||||
|
||||
- `-remoteWrite.streamAggr.config` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
|
||||
This flag can be specified individually per each specified `-remoteWrite.url`.
|
||||
This flag can be specified individually per each `-remoteWrite.url`.
|
||||
This allows writing different aggregates to different remote storage destinations.
|
||||
- `-streamAggr.config` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
|
||||
|
||||
|
@ -22,13 +22,23 @@ By default only the aggregated data is written to the storage. If the original i
|
|||
then the following command-line flags must be specified:
|
||||
|
||||
- `-remoteWrite.streamAggr.keepInput` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
|
||||
This flag can be specified individually per each specified `-remoteWrite.url`.
|
||||
This flag can be specified individually per each `-remoteWrite.url`.
|
||||
This allows writing both raw and aggregate data to different remote storage destinations.
|
||||
- `-streamAggr.keepInput` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
|
||||
|
||||
Stream aggregation ignores timestamps associated with the input [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples).
|
||||
It expects that the ingested samples have timestamps close to the current time.
|
||||
|
||||
By default all the input samples are aggregated. Sometimes it is needed to de-duplicate samples before the aggregation.
|
||||
For example, if the samples are received from replicated sources.
|
||||
The following command-line flag can be used for enabling the [de-duplication](https://docs.victoriametrics.com/#deduplication)
|
||||
before aggregation in this case:
|
||||
|
||||
- `-remoteWrite.streamAggr.dedupInterval` at [vmagent](https://docs.victoriametrics.com/vmagent.html).
|
||||
This flag can be specified individually per each `-remoteWrite.url`.
|
||||
This allows setting different de-duplication intervals per each configured remote storage.
|
||||
- `-streamAggr.dedupInterval` at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html).
|
||||
|
||||
## Use cases
|
||||
|
||||
Stream aggregation can be used in the following cases:
|
||||
|
|
|
@ -1209,7 +1209,9 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-influxTrimTimestamp duration
|
||||
Trim timestamps for InfluxDB line protocol data to this duration. Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data (default 1ms)
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration for waiting in the queue for insert requests due to -maxConcurrentInserts (default 1m0s)
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-kafka.consumer.topic array
|
||||
Kafka topic names for data consumption. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
|
@ -1344,12 +1346,12 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-promscrape.minResponseSizeForStreamParse size
|
||||
The minimum target response size for automatic switching to stream parsing mode, which can reduce memory usage. See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1000000)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.nomad.waitTime duration
|
||||
Wait time used by Nomad service discovery. Default value is used if not set
|
||||
-promscrape.nomadSDCheckInterval duration
|
||||
Interval for checking for changes in Nomad. This works only if nomad_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs for details (default 30s)
|
||||
-promscrape.noStaleMarkers
|
||||
Whether to disable sending Prometheus stale markers for metrics when scrape target disappears. This option may reduce memory usage if stale markers aren't needed for your setup. This option also disables populating the scrape_series_added metric. See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||
-promscrape.openstackSDCheckInterval duration
|
||||
Interval for checking for changes in openstack API server. This works only if openstack_sd_configs is configured in '-promscrape.config' file. See https://docs.victoriametrics.com/sd_configs.html#openstack_sd_configs for details (default 30s)
|
||||
-promscrape.seriesLimitPerTarget int
|
||||
|
@ -1472,8 +1474,11 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
The number of significant figures to leave in metric values before writing them to remote storage. See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. This option may be used for improving data compression for the stored metrics. See also -remoteWrite.roundDigits
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.config array
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/stream-aggregation.html . See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.dedupInterval array
|
||||
Input samples are de-duplicated with this interval before being aggregated. Only the last sample per each time series per each interval is aggregated if the interval is greater than zero
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
-remoteWrite.streamAggr.keepInput array
|
||||
Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. See https://docs.victoriametrics.com/stream-aggregation.html
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
|
|
|
@ -902,6 +902,10 @@ The shortlist of configuration flags is the following:
|
|||
Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
|
||||
-httpListenAddr string
|
||||
Address to listen for http connections (default ":8880")
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. Lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning (default 300)
|
||||
-loggerDisableTimestamps
|
||||
Whether to disable writing timestamps in logs
|
||||
-loggerErrorsPerSecondLimit int
|
||||
|
@ -918,6 +922,8 @@ The shortlist of configuration flags is the following:
|
|||
Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC")
|
||||
-loggerWarnsPerSecondLimit int
|
||||
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
|
||||
-maxConcurrentInserts int
|
||||
The maximum number of concurrent insert requests. Default value should work for most cases, since it minimizes the memory usage. The default value can be increased when clients send data over slow networks. See also -insert.maxQueueDuration (default 8)
|
||||
-memory.allowedBytes size
|
||||
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from OS page cache resulting in higher disk IO usage
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
|
||||
|
|
|
@ -38,13 +38,16 @@ var supportedOutputs = []string{
|
|||
|
||||
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
|
||||
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load aggregators: %w", err)
|
||||
}
|
||||
as, err := NewAggregatorsFromData(data, pushFunc)
|
||||
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
|
||||
}
|
||||
|
@ -53,13 +56,16 @@ func LoadFromFile(path string, pushFunc PushFunc) (*Aggregators, error) {
|
|||
|
||||
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func NewAggregatorsFromData(data []byte, pushFunc PushFunc) (*Aggregators, error) {
|
||||
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewAggregators(cfgs, pushFunc)
|
||||
return NewAggregators(cfgs, pushFunc, dedupInterval)
|
||||
}
|
||||
|
||||
// Config is a configuration for a single stream aggregation.
|
||||
|
@ -130,14 +136,17 @@ type Aggregators struct {
|
|||
//
|
||||
// pushFunc is called when the aggregated data must be flushed.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// MustStop must be called on the returned Aggregators when they are no longer needed.
|
||||
func NewAggregators(cfgs []*Config, pushFunc PushFunc) (*Aggregators, error) {
|
||||
func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
if len(cfgs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
for i, cfg := range cfgs {
|
||||
a, err := newAggregator(cfg, pushFunc)
|
||||
a, err := newAggregator(cfg, pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
}
|
||||
|
@ -179,6 +188,10 @@ type aggregator struct {
|
|||
without []string
|
||||
aggregateOnlyByTime bool
|
||||
|
||||
// dedupAggr is set to non-nil if input samples must be de-duplicated according
|
||||
// to the dedupInterval passed to newAggregator().
|
||||
dedupAggr *lastAggrState
|
||||
|
||||
// aggrStates contains aggregate states for the given outputs
|
||||
aggrStates []aggrState
|
||||
|
||||
|
@ -205,8 +218,11 @@ type PushFunc func(tss []prompbmarshal.TimeSeries)
|
|||
|
||||
// newAggregator creates new aggregator for the given cfg, which pushes the aggregate data to pushFunc.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// The returned aggregator must be stopped when no longer needed by calling MustStop().
|
||||
func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
|
||||
func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration) (*aggregator, error) {
|
||||
// check cfg.Interval
|
||||
interval, err := time.ParseDuration(cfg.Interval)
|
||||
if err != nil {
|
||||
|
@ -309,6 +325,11 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
|
|||
}
|
||||
suffix += "_"
|
||||
|
||||
var dedupAggr *lastAggrState
|
||||
if dedupInterval > 0 {
|
||||
dedupAggr = newLastAggrState()
|
||||
}
|
||||
|
||||
// initialize the aggregator
|
||||
a := &aggregator{
|
||||
match: cfg.Match,
|
||||
|
@ -320,6 +341,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
|
|||
without: without,
|
||||
aggregateOnlyByTime: aggregateOnlyByTime,
|
||||
|
||||
dedupAggr: dedupAggr,
|
||||
aggrStates: aggrStates,
|
||||
pushFunc: pushFunc,
|
||||
|
||||
|
@ -328,15 +350,41 @@ func newAggregator(cfg *Config, pushFunc PushFunc) (*aggregator, error) {
|
|||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
if dedupAggr != nil {
|
||||
a.wg.Add(1)
|
||||
go func() {
|
||||
a.runDedupFlusher(dedupInterval)
|
||||
a.wg.Done()
|
||||
}()
|
||||
}
|
||||
a.wg.Add(1)
|
||||
go func() {
|
||||
a.runFlusher(interval)
|
||||
defer a.wg.Done()
|
||||
a.wg.Done()
|
||||
}()
|
||||
|
||||
return a, nil
|
||||
}
|
||||
|
||||
func (a *aggregator) runDedupFlusher(interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-a.stopCh:
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
|
||||
// Globally limit the concurrency for metrics' flush
|
||||
// in order to limit memory usage when big number of aggregators
|
||||
// are flushed at the same time.
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.dedupFlush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
}
|
||||
|
||||
func (a *aggregator) runFlusher(interval time.Duration) {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
|
@ -358,6 +406,15 @@ func (a *aggregator) runFlusher(interval time.Duration) {
|
|||
|
||||
var flushConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
|
||||
|
||||
func (a *aggregator) dedupFlush() {
|
||||
ctx := &flushCtx{
|
||||
skipAggrSuffix: true,
|
||||
}
|
||||
a.dedupAggr.appendSeriesForFlush(ctx)
|
||||
logger.Errorf("series after dedup: %v", ctx.tss)
|
||||
a.push(ctx.tss)
|
||||
}
|
||||
|
||||
func (a *aggregator) flush() {
|
||||
ctx := &flushCtx{
|
||||
suffix: a.suffix,
|
||||
|
@ -395,8 +452,29 @@ func (a *aggregator) MustStop() {
|
|||
a.wg.Wait()
|
||||
}
|
||||
|
||||
// Push pushes series to a.
|
||||
// Push pushes tss to a.
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
if a.dedupAggr == nil {
|
||||
a.push(tss)
|
||||
return
|
||||
}
|
||||
|
||||
// deduplication is enabled.
|
||||
// push samples to dedupAggr, so later they will be pushed to the configured aggregators.
|
||||
pushSample := a.dedupAggr.pushSample
|
||||
inputKey := ""
|
||||
bb := bbPool.Get()
|
||||
for _, ts := range tss {
|
||||
bb.B = marshalLabelsFast(bb.B[:0], ts.Labels)
|
||||
outputKey := bytesutil.InternBytes(bb.B)
|
||||
for _, sample := range ts.Samples {
|
||||
pushSample(inputKey, outputKey, sample.Value)
|
||||
}
|
||||
}
|
||||
bbPool.Put(bb)
|
||||
}
|
||||
|
||||
func (a *aggregator) push(tss []prompbmarshal.TimeSeries) {
|
||||
labels := promutils.GetLabels()
|
||||
tmpLabels := promutils.GetLabels()
|
||||
bb := bbPool.Get()
|
||||
|
@ -545,7 +623,8 @@ func unmarshalLabelsFast(dst []prompbmarshal.Label, src []byte) ([]prompbmarshal
|
|||
}
|
||||
|
||||
type flushCtx struct {
|
||||
suffix string
|
||||
skipAggrSuffix bool
|
||||
suffix string
|
||||
|
||||
tss []prompbmarshal.TimeSeries
|
||||
labels []prompbmarshal.Label
|
||||
|
@ -567,7 +646,9 @@ func (ctx *flushCtx) appendSeries(labelsMarshaled, suffix string, timestamp int6
|
|||
if err != nil {
|
||||
logger.Panicf("BUG: cannot unmarshal labels from output key: %s", err)
|
||||
}
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
|
||||
if !ctx.skipAggrSuffix {
|
||||
ctx.labels = addMetricSuffix(ctx.labels, labelsLen, ctx.suffix, suffix)
|
||||
}
|
||||
ctx.samples = append(ctx.samples, prompbmarshal.Sample{
|
||||
Timestamp: timestamp,
|
||||
Value: value,
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
|
@ -18,7 +19,7 @@ func TestAggregatorsFailure(t *testing.T) {
|
|||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
||||
}
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
|
@ -136,7 +137,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
|||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
@ -641,6 +642,83 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90
|
|||
`)
|
||||
}
|
||||
|
||||
func TestAggregatorsWithDedupInterval(t *testing.T) {
|
||||
f := func(config, inputMetrics, outputMetricsExpected string) {
|
||||
t.Helper()
|
||||
|
||||
// Initialize Aggregators
|
||||
var tssOutput []prompbmarshal.TimeSeries
|
||||
var tssOutputLock sync.Mutex
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
tssOutputLock.Lock()
|
||||
for _, ts := range tss {
|
||||
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
||||
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
||||
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
||||
Labels: labelsCopy,
|
||||
Samples: samplesCopy,
|
||||
})
|
||||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
const dedupInterval = time.Hour
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range a.as {
|
||||
aggr.dedupFlush()
|
||||
aggr.flush()
|
||||
}
|
||||
}
|
||||
a.MustStop()
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
tsStrings := make([]string, len(tssOutput))
|
||||
for i, ts := range tssOutput {
|
||||
tsStrings[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(tsStrings)
|
||||
outputMetrics := strings.Join(tsStrings, "")
|
||||
if outputMetrics != outputMetricsExpected {
|
||||
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar 567
|
||||
`, `bar:1m_sum_samples 567
|
||||
foo:1m_sum_samples 123
|
||||
`)
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar{baz="qwe"} 1.32
|
||||
bar{baz="qwe"} 4.34
|
||||
bar{baz="qwe"} 2
|
||||
foo{baz="qwe"} -5
|
||||
bar{baz="qwer"} 343
|
||||
bar{baz="qwer"} 344
|
||||
foo{baz="qwe"} 10
|
||||
`, `bar:1m_sum_samples{baz="qwe"} 2
|
||||
bar:1m_sum_samples{baz="qwer"} 344
|
||||
foo:1m_sum_samples 123
|
||||
foo:1m_sum_samples{baz="qwe"} 10
|
||||
`)
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
if len(ts.Samples) != 1 {
|
||||
|
|
|
@ -40,7 +40,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
|
|||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
panic(fmt.Errorf("unexpected pushFunc call"))
|
||||
}
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc)
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error when initializing aggregators: %s", err)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue