Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2023-03-31 22:55:41 -07:00
commit 14ab18375f
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
27 changed files with 615 additions and 147 deletions

View file

@ -742,20 +742,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre
### Prometheus querying API enhancements ### Prometheus querying API enhancements
VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used for enforcing additional label filters for queries. For example, VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used
`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>` would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>`
would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`.
This functionality can be used for limiting the scope of time series visible to the given tenant.
It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries.
`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically
add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting
the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically
set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats).
VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query)
and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values
to the given number of digits after the decimal point.
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label/<labelName>/values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels)
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries.
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.
If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values,
then limits specified in the command-line flags are used.
By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/<labelName>/values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC
VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series),
[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and
[`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues),
while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range.
VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns.
If you need the exact set of label names and label values on the given time range, then send queries
to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query).
VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series)
for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series.
If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used.
Additionally, VictoriaMetrics provides the following handlers: Additionally, VictoriaMetrics provides the following handlers:
@ -2168,7 +2193,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
-dryRun -dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6 -enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable -envflag.enable

View file

@ -8,6 +8,8 @@ import (
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert"
vminsertcommon "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common"
vminsertrelabel "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
@ -30,8 +32,9 @@ var (
"With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing") "With enabled proxy protocol http server cannot serve regular /metrics endpoint. Use -pushmetrics.url for metrics pushing")
minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+ minScrapeInterval = flag.Duration("dedup.minScrapeInterval", 0, "Leave only the last sample in every time series per each discrete interval "+
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ dryRun = flag.Bool("dryRun", false, "Whether to check config files without running VictoriaMetrics. The following config files are checked: "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag") "-promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. "+
"This can be changed with -promscrape.config.strictParse=false command-line flag")
inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+ inmemoryDataFlushInterval = flag.Duration("inmemoryDataFlushInterval", 5*time.Second, "The interval for guaranteed saving of in-memory data to disk. "+
"The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+ "The saved data survives unclean shutdown such as OOM crash, hardware reset, SIGKILL, etc. "+
"Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+ "Bigger intervals may help increasing lifetime of flash storage with limited write cycles (e.g. Raspberry PI). "+
@ -62,6 +65,12 @@ func main() {
if err := promscrape.CheckConfig(); err != nil { if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err) logger.Fatalf("error when checking -promscrape.config: %s", err)
} }
if err := vminsertrelabel.CheckRelabelConfig(); err != nil {
logger.Fatalf("error when checking -relabelConfig: %s", err)
}
if err := vminsertcommon.CheckStreamAggrConfig(); err != nil {
logger.Fatalf("error when checking -streamAggr.config: %s", err)
}
logger.Infof("-promscrape.config is ok; exiting with 0 status code") logger.Infof("-promscrape.config is ok; exiting with 0 status code")
return return
} }

View file

@ -104,7 +104,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
`vmagent` should be restarted in order to update config options set via command-line args. `vmagent` should be restarted in order to update config options set via command-line args.
`vmagent` supports multiple approaches for reloading configs from updated config files such as `vmagent` supports multiple approaches for reloading configs from updated config files such as
`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: `-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process: * Sending `SIGHUP` signal to `vmagent` process:
@ -1186,7 +1186,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-denyQueryTracing -denyQueryTracing
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
-dryRun -dryRun
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
-enableTCP6 -enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable -envflag.enable

View file

@ -67,8 +67,8 @@ var (
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+ opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt") "at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg") configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+ dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . "+ "-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag") "Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
) )
@ -103,11 +103,14 @@ func main() {
return return
} }
if *dryRun { if *dryRun {
if err := promscrape.CheckConfig(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err)
}
if err := remotewrite.CheckRelabelConfigs(); err != nil { if err := remotewrite.CheckRelabelConfigs(); err != nil {
logger.Fatalf("error when checking relabel configs: %s", err) logger.Fatalf("error when checking relabel configs: %s", err)
} }
if err := promscrape.CheckConfig(); err != nil { if err := remotewrite.CheckStreamAggrConfigs(); err != nil {
logger.Fatalf("error when checking -promscrape.config: %s", err) logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
} }
logger.Infof("all the configs are ok; exiting with 0 status code") logger.Infof("all the configs are ok; exiting with 0 status code")
return return

View file

@ -158,9 +158,8 @@ func Init() {
logger.Fatalf("cannot load relabel configs: %s", err) logger.Fatalf("cannot load relabel configs: %s", err)
} }
allRelabelConfigs.Store(rcs) allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
configSuccess.Set(1) relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
configTimestamp.Set(fasttime.UnixTimestamp())
if len(*remoteWriteURLs) > 0 { if len(*remoteWriteURLs) > 0 {
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs) rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
@ -173,34 +172,56 @@ func Init() {
for { for {
select { select {
case <-sighupCh: case <-sighupCh:
case <-stopCh: case <-configReloaderStopCh:
return return
} }
configReloads.Inc() reloadRelabelConfigs()
logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig") reloadStreamAggrConfigs()
rcs, err := loadRelabelConfigs()
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
continue
}
allRelabelConfigs.Store(rcs)
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("Successfully reloaded relabel configs")
} }
}() }()
} }
func reloadRelabelConfigs() {
relabelConfigReloads.Inc()
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
relabelConfigSuccess.Set(0)
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
return
}
allRelabelConfigs.Store(rcs)
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded relabel configs")
}
var ( var (
configReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`) relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
configReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`) relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
configSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`) relabelConfigSuccess = metrics.NewCounter(`vmagent_relabel_config_last_reload_successful`)
configTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
) )
func reloadStreamAggrConfigs() {
if len(*remoteWriteMultitenantURLs) > 0 {
rwctxsMapLock.Lock()
for _, rwctxs := range rwctxsMap {
reinitStreamAggr(rwctxs)
}
rwctxsMapLock.Unlock()
} else {
reinitStreamAggr(rwctxsDefault)
}
}
func reinitStreamAggr(rwctxs []*remoteWriteCtx) {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
if len(urls) == 0 { if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty") logger.Panicf("BUG: urls must be non-empty")
@ -266,14 +287,14 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
return rwctxs return rwctxs
} }
var stopCh = make(chan struct{}) var configReloaderStopCh = make(chan struct{})
var configReloaderWG sync.WaitGroup var configReloaderWG sync.WaitGroup
// Stop stops remotewrite. // Stop stops remotewrite.
// //
// It is expected that nobody calls Push during and after the call to this func. // It is expected that nobody calls Push during and after the call to this func.
func Stop() { func Stop() {
close(stopCh) close(configReloaderStopCh)
configReloaderWG.Wait() configReloaderWG.Wait()
for _, rwctx := range rwctxsDefault { for _, rwctx := range rwctxsDefault {
@ -488,7 +509,7 @@ type remoteWriteCtx struct {
fq *persistentqueue.FastQueue fq *persistentqueue.FastQueue
c *client c *client
sas *streamaggr.Aggregators sas atomic.Pointer[streamaggr.Aggregators]
streamAggrKeepInput bool streamAggrKeepInput bool
pss []*pendingSeries pss []*pendingSeries
@ -553,10 +574,12 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0) dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval) sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
if err != nil { if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err) logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
} }
rwctx.sas = sas rwctx.sas.Store(sas)
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} }
return rwctx return rwctx
@ -571,8 +594,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.fq.UnblockAllReaders() rwctx.fq.UnblockAllReaders()
rwctx.c.MustStop() rwctx.c.MustStop()
rwctx.c = nil rwctx.c = nil
rwctx.sas.MustStop()
rwctx.sas = nil sas := rwctx.sas.Swap(nil)
sas.MustStop()
rwctx.fq.MustClose() rwctx.fq.MustClose()
rwctx.fq = nil rwctx.fq = nil
@ -603,8 +628,9 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
rwctx.rowsPushedAfterRelabel.Add(rowsCount) rwctx.rowsPushedAfterRelabel.Add(rowsCount)
// Apply stream aggregation if any // Apply stream aggregation if any
rwctx.sas.Push(tss) sas := rwctx.sas.Load()
if rwctx.sas == nil || rwctx.streamAggrKeepInput { sas.Push(tss)
if sas == nil || rwctx.streamAggrKeepInput {
// Push samples to the remote storage // Push samples to the remote storage
rwctx.pushInternal(tss) rwctx.pushInternal(tss)
} }
@ -623,6 +649,36 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
pss[idx].Push(tss) pss[idx].Push(tss)
} }
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
sas := rwctx.sas.Load()
if sas == nil {
// There is no stream aggregation for rwctx
return
}
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(rwctx.idx, 0)
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssRelabelPool = &sync.Pool{ var tssRelabelPool = &sync.Pool{
New: func() interface{} { New: func() interface{} {
a := []prompbmarshal.TimeSeries{} a := []prompbmarshal.TimeSeries{}
@ -637,3 +693,20 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
} }
return rowsCount return rowsCount
} }
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs() error {
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
for idx, sasFile := range *streamAggrConfig {
if sasFile == "" {
continue
}
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(idx, 0)
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval)
if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
}
sas.MustStop()
}
return nil
}

View file

@ -170,6 +170,8 @@ func templateAnnotation(dst io.Writer, text string, data tplData, tmpl *textTpl.
if err != nil { if err != nil {
return fmt.Errorf("error cloning template before parse annotation: %w", err) return fmt.Errorf("error cloning template before parse annotation: %w", err)
} }
// Clone() doesn't copy tpl Options, so we set them manually
tpl = tpl.Option("missingkey=zero")
tpl, err = tpl.Parse(text) tpl, err = tpl.Parse(text)
if err != nil { if err != nil {
return fmt.Errorf("error parsing annotation template: %w", err) return fmt.Errorf("error parsing annotation template: %w", err)

View file

@ -781,7 +781,25 @@ To avoid such situation try to filter out VM process metrics via `--vm-native-fi
4. `vmctl` doesn't provide relabeling or other types of labels management in this mode. 4. `vmctl` doesn't provide relabeling or other types of labels management in this mode.
Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375). Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375).
5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
and specify `accountID` param. and specify `accountID` param. Example formats:
```console
# Migrating from cluster to single
--vm-native-src-addr=http://<src-vmselect>:8481/select/0/prometheus
--vm-native-dst-addr=http://<dst-vmsingle>:8428
# Migrating from single to cluster
--vm-native-src-addr=http://<src-vmsingle>:8428
--vm-native-src-addr=http://<dst-vminsert>:8480/insert/0/prometheus
# Migrating single to single
--vm-native-src-addr=http://<src-vmsingle>:8428
--vm-native-dst-addr=http://<dst-vmsingle>:8428
# Migrating cluster to cluster
--vm-native-src-addr=http://<src-vmselect>:8481/select/0/prometheus
--vm-native-dst-addr=http://<dst-vminsert>:8480/insert/0/prometheus
```
6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps.
7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. 7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results.
Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according

View file

@ -137,7 +137,8 @@ func (ctx *InsertCtx) ApplyRelabeling() {
// FlushBufs flushes buffered rows to the underlying storage. // FlushBufs flushes buffered rows to the underlying storage.
func (ctx *InsertCtx) FlushBufs() error { func (ctx *InsertCtx) FlushBufs() error {
if sa != nil && !ctx.skipStreamAggr { sas := sasGlobal.Load()
if sas != nil && !ctx.skipStreamAggr {
ctx.streamAggrCtx.push(ctx.mrs) ctx.streamAggrCtx.push(ctx.mrs)
if !*streamAggrKeepInput { if !*streamAggrKeepInput {
ctx.Reset(0) ctx.Reset(0)

View file

@ -2,15 +2,20 @@ package common
import ( import (
"flag" "flag"
"fmt"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
) )
var ( var (
@ -24,28 +29,97 @@ var (
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero") "Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
) )
var (
saCfgReloaderStopCh = make(chan struct{})
saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
saCfgSuccess = metrics.NewCounter(`vminsert_streamagg_config_last_reload_successful`)
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
sasGlobal atomic.Pointer[streamaggr.Aggregators]
)
// CheckStreamAggrConfig checks config pointed by -stramaggr.config
func CheckStreamAggrConfig() error {
if *streamAggrConfig == "" {
return nil
}
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, *streamAggrDedupInterval)
if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
sas.MustStop()
return nil
}
// InitStreamAggr must be called after flag.Parse and before using the common package. // InitStreamAggr must be called after flag.Parse and before using the common package.
// //
// MustStopStreamAggr must be called when stream aggr is no longer needed. // MustStopStreamAggr must be called when stream aggr is no longer needed.
func InitStreamAggr() { func InitStreamAggr() {
if *streamAggrConfig == "" { if *streamAggrConfig == "" {
// Nothing to initialize
return return
} }
a, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
sighupCh := procutil.NewSighupChan()
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
if err != nil { if err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
} }
sa = a sasGlobal.Store(sas)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader.
saCfgReloaderWG.Add(1)
go func() {
defer saCfgReloaderWG.Done()
for {
select {
case <-sighupCh:
case <-saCfgReloaderStopCh:
return
}
reloadStreamAggrConfig()
}
}()
}
func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, *streamAggrDedupInterval)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return
}
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
sasNew.MustStop()
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
} }
// MustStopStreamAggr stops stream aggregators. // MustStopStreamAggr stops stream aggregators.
func MustStopStreamAggr() { func MustStopStreamAggr() {
sa.MustStop() close(saCfgReloaderStopCh)
sa = nil saCfgReloaderWG.Wait()
}
var sa *streamaggr.Aggregators sas := sasGlobal.Swap(nil)
sas.MustStop()
}
type streamAggrCtx struct { type streamAggrCtx struct {
mn storage.MetricName mn storage.MetricName
@ -64,6 +138,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts := &tss[0] ts := &tss[0]
labels := ts.Labels labels := ts.Labels
samples := ts.Samples samples := ts.Samples
sas := sasGlobal.Load()
for _, mr := range mrs { for _, mr := range mrs {
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil { if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err) logger.Panicf("BUG: cannot unmarshal recently marshaled MetricName: %s", err)
@ -88,7 +163,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow) {
ts.Labels = labels ts.Labels = labels
ts.Samples = samples ts.Samples = samples
sa.Push(tss) sas.Push(tss)
} }
} }

View file

@ -71,6 +71,12 @@ var (
var pcsGlobal atomic.Value var pcsGlobal atomic.Value
// CheckRelabelConfig checks config pointed by -relabelConfig
func CheckRelabelConfig() error {
_, err := loadRelabelConfig()
return err
}
func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) { func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
if len(*relabelConfig) == 0 { if len(*relabelConfig) == 0 {
return nil, nil return nil, nil

View file

@ -6,7 +6,7 @@ COPY web/ /build/
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \ RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/ GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/
FROM alpine:3.17.2 FROM alpine:3.17.3
USER root USER root
COPY --from=build-web-stage /build/web-amd64 /app/web COPY --from=build-web-stage /build/web-amd64 /app/web

View file

@ -2,8 +2,8 @@
DOCKER_NAMESPACE := victoriametrics DOCKER_NAMESPACE := victoriametrics
ROOT_IMAGE ?= alpine:3.17.2 ROOT_IMAGE ?= alpine:3.17.3
CERTS_IMAGE := alpine:3.17.2 CERTS_IMAGE := alpine:3.17.3
GO_BUILDER_IMAGE := golang:1.20.2-alpine GO_BUILDER_IMAGE := golang:1.20.2-alpine
BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1 BUILDER_IMAGE := local/builder:2.0.0-$(shell echo $(GO_BUILDER_IMAGE) | tr :/ __)-1
BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __) BASE_IMAGE := local/base:1.1.4-$(shell echo $(ROOT_IMAGE) | tr :/ __)-$(shell echo $(CERTS_IMAGE) | tr :/ __)

View file

@ -19,6 +19,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data so the previous versions of VictoriaMetrics will exit with the `unexpected number of substrings in the part name` error when trying to run them on the data
created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases** created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or newer releases**
* SECURITY: upgrade base docker image (alpine) from 3.17.2 to 3.17.3. See [alpine 3.17.3 release notes](https://alpinelinux.org/posts/Alpine-3.17.3-released.html).
* FEATURE: release Windows binaries for [single-node VictoriaMetrics](https://docs.victoriametrics.com/), [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), [vmbackup](https://docs.victoriametrics.com/vmbackup.html) and [vmrestore](https://docs.victoriametrics.com/vmrestore.html). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues. * FEATURE: release Windows binaries for [single-node VictoriaMetrics](https://docs.victoriametrics.com/), [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html), [vmbackup](https://docs.victoriametrics.com/vmbackup.html) and [vmrestore](https://docs.victoriametrics.com/vmrestore.html). See [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3236), [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3821) and [this](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/70) issues.
* FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case. * FEATURE: log metrics with truncated labels if the length of label value in the ingested metric exceeds `-maxLabelValueLen`. This should simplify debugging for this case.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show target URL when debugging [target relabeling](https://docs.victoriametrics.com/vmagent.html#relabel-debug). This should simplify target relabel debugging a bit. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3882). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): show target URL when debugging [target relabeling](https://docs.victoriametrics.com/vmagent.html#relabel-debug). This should simplify target relabel debugging a bit. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3882).
@ -26,6 +28,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add the ability for hot reloading of [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configs. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
* FEATURE: check the contents of `-relabelConfig` and `-streamAggr.config` files additionally to `-promscrape.config` when single-node VictoriaMetrics runs with `-dryRun` command-line flag. This aligns the behaviour of single-node VictoriaMetrics with [vmagent](https://docs.victoriametrics.com/vmagent.html) behaviour for `-dryRun` command-line flag.
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
@ -41,6 +45,9 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999). * BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966). * BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly convert [VictoriaMetrics historgram buckets](https://valyala.medium.com/improving-histogram-usability-for-prometheus-and-grafana-bc7e5df0e350) to Prometheus histogram buckets when VictoriaMetrics histogram contain zero buckets. Previously these buckets were ignored, and this could lead to missing Prometheus histogram buckets after the conversion. Thanks to @zklapow for [the fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/4021).
* BUGFIX: properly support comma-separated filters inside [retention filters](https://docs.victoriametrics.com/#retention-filters). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3915).
* BUGFIX: verify response code when fetching configuration files via HTTP. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4034).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): replace empty labels with "" instead of "<no value>" during templating, as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4012).
## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1) ## [v1.89.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.89.1)

View file

@ -743,20 +743,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre
### Prometheus querying API enhancements ### Prometheus querying API enhancements
VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used for enforcing additional label filters for queries. For example, VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used
`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>` would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>`
would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`.
This functionality can be used for limiting the scope of time series visible to the given tenant.
It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries.
`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically
add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting
the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically
set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats).
VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query)
and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values
to the given number of digits after the decimal point.
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label/<labelName>/values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels)
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries.
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.
If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values,
then limits specified in the command-line flags are used.
By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/<labelName>/values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC
VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series),
[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and
[`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues),
while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range.
VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns.
If you need the exact set of label names and label values on the given time range, then send queries
to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query).
VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series)
for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series.
If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used.
Additionally, VictoriaMetrics provides the following handlers: Additionally, VictoriaMetrics provides the following handlers:
@ -2169,7 +2194,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
-dryRun -dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6 -enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable -envflag.enable

View file

@ -746,20 +746,45 @@ All the Prometheus querying API handlers can be prepended with `/prometheus` pre
### Prometheus querying API enhancements ### Prometheus querying API enhancements
VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used for enforcing additional label filters for queries. For example, VictoriaMetrics accepts optional `extra_label=<label_name>=<label_value>` query arg, which can be used
`/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>` would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. for enforcing additional label filters for queries. For example, `/api/v1/query_range?extra_label=user_id=123&extra_label=group_id=456&query=<query>`
would automatically add `{user_id="123",group_id="456"}` label filters to the given `<query>`.
This functionality can be used for limiting the scope of time series visible to the given tenant.
It is expected that the `extra_label` query args are automatically set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries. For example, VictoriaMetrics accepts optional `extra_filters[]=series_selector` query arg, which can be used for enforcing arbitrary label filters for queries.
`/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically set by auth proxy sitting in front of VictoriaMetrics. See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies. For example, `/api/v1/query_range?extra_filters[]={env=~"prod|staging",user="xyz"}&query=<query>` would automatically
add `{env=~"prod|staging",user="xyz"}` label filters to the given `<query>`. This functionality can be used for limiting
the scope of time series visible to the given tenant. It is expected that the `extra_filters[]` query args are automatically
set by auth proxy sitting in front of VictoriaMetrics.
See [vmauth](https://docs.victoriametrics.com/vmauth.html) and [vmgateway](https://docs.victoriametrics.com/vmgateway.html) as examples of such proxies.
VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats). VictoriaMetrics accepts multiple formats for `time`, `start` and `end` query args - see [these docs](#timestamp-formats).
VictoriaMetrics accepts `round_digits` query arg for `/api/v1/query` and `/api/v1/query_range` handlers. It can be used for rounding response values to the given number of digits after the decimal point. For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point. VictoriaMetrics accepts `round_digits` query arg for [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query)
and [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query) handlers. It can be used for rounding response values
to the given number of digits after the decimal point.
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
VictoriaMetrics accepts `limit` query arg for `/api/v1/labels` and `/api/v1/label/<labelName>/values` handlers for limiting the number of returned entries. For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels. If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values, then limits specified in the command-line flags are used. VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels)
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues) handlers for limiting the number of returned entries.
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.
If the provided `limit` value exceeds the corresponding `-search.maxTagKeys` / `-search.maxTagValues` command-line flag values,
then limits specified in the command-line flags are used.
By default, VictoriaMetrics returns time series for the last 5 minutes from `/api/v1/series`, `/api/v1/labels` and `/api/v1/label/<labelName>/values` while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range. By default, VictoriaMetrics returns time series for the last day starting at 00:00 UTC
VictoriaMetrics accepts `limit` query arg for `/api/v1/series` handlers for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest. If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used. from [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series),
[/api/v1/labels](https://docs.victoriametrics.com/url-examples.html#apiv1labels) and
[`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/url-examples.html#apiv1labelvalues),
while the Prometheus API defaults to all time. Explicitly set `start` and `end` to select the desired time range.
VictoriaMetrics rounds the specified `start..end` time range to day granularity because of performance optimization concerns.
If you need the exact set of label names and label values on the given time range, then send queries
to [/api/v1/query](https://docs.victoriametrics.com/keyConcepts.html#instant-query) or to [/api/v1/query_range](https://docs.victoriametrics.com/keyConcepts.html#range-query).
VictoriaMetrics accepts `limit` query arg at [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series)
for limiting the number of returned entries. For example, the query to `/api/v1/series?limit=5` returns a sample of up to 5 series, while ignoring the rest of series.
If the provided `limit` value exceeds the corresponding `-search.maxSeries` command-line flag values, then limits specified in the command-line flags are used.
Additionally, VictoriaMetrics provides the following handlers: Additionally, VictoriaMetrics provides the following handlers:
@ -2172,7 +2197,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
Supports an array of values separated by comma or specified via multiple flags. Supports an array of values separated by comma or specified via multiple flags.
-dryRun -dryRun
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
-enableTCP6 -enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable -envflag.enable

View file

@ -1,4 +1,4 @@
# vmanomaly Quickstart # Getting started with vmanomaly
**Prerequisites** **Prerequisites**
- In the tutorial, we'll be using the following VictoriaMetrics components: - In the tutorial, we'll be using the following VictoriaMetrics components:
@ -8,7 +8,7 @@
If you're unfamiliar with the listed components, please read [QuickStart](https://docs.victoriametrics.com/Quick-Start.html) first. If you're unfamiliar with the listed components, please read [QuickStart](https://docs.victoriametrics.com/Quick-Start.html) first.
- It is assumed that you are familiar with [Grafana](https://grafana.com/)(v.9.3.1) and [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/). - It is assumed that you are familiar with [Grafana](https://grafana.com/)(v.9.3.1) and [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/).
## What is vmanomaly? ## 1. What is vmanomaly?
*VictoriaMetrics Anomaly Detection* ([vmanomaly](https://docs.victoriametrics.com/vmanomaly.html)) is a service that continuously scans time series stored in VictoriaMetrics and detects unexpected changes within data patterns in real-time. It does so by utilizing user-configurable machine learning models. *VictoriaMetrics Anomaly Detection* ([vmanomaly](https://docs.victoriametrics.com/vmanomaly.html)) is a service that continuously scans time series stored in VictoriaMetrics and detects unexpected changes within data patterns in real-time. It does so by utilizing user-configurable machine learning models.
All the service parameters are defined in a config file. All the service parameters are defined in a config file.
@ -27,11 +27,11 @@ The value is designed to:
- *exceed 1* if the datapoint is abnormal. - *exceed 1* if the datapoint is abnormal.
Then, users can enable alerting rules based on the **anomaly score** with [vmalert](#what-is-vmalert). Then, users can enable alerting rules based on the **anomaly score** with [vmalert](#what-is-vmalert).
## What is vmalert? ## 2. What is vmalert?
[vmalert](https://docs.victoriametrics.com/vmalert.html) is an alerting tool for VictoriaMetrics. It executes a list of the given alerting or recording rules against configured `-datasource.url`. [vmalert](https://docs.victoriametrics.com/vmalert.html) is an alerting tool for VictoriaMetrics. It executes a list of the given alerting or recording rules against configured `-datasource.url`.
[Alerting rules](https://docs.victoriametrics.com/vmalert.html#alerting-rules) allow you to define conditions that, when met, will notify the user. The alerting condition is defined in a form of a query expression via [MetricsQL query language](https://docs.victoriametrics.com/MetricsQL.html). For example, in our case, the expression `anomaly_score > 1.0` will notify a user when the calculated anomaly score exceeds a threshold of 1. [Alerting rules](https://docs.victoriametrics.com/vmalert.html#alerting-rules) allow you to define conditions that, when met, will notify the user. The alerting condition is defined in a form of a query expression via [MetricsQL query language](https://docs.victoriametrics.com/MetricsQL.html). For example, in our case, the expression `anomaly_score > 1.0` will notify a user when the calculated anomaly score exceeds a threshold of 1.
## How does vmanomaly works with vmalert? ## 3. How does vmanomaly works with vmalert?
Compared to classical alerting rules, anomaly detection is more "hands-off" and data-aware. Instead of thinking of critical conditions to define, user can rely on catching anomalies that were not expected to happen. In other words, by setting up alerting rules, a user must know what to look for, ahead of time, while anomaly detection looks for any deviations from past behavior. Compared to classical alerting rules, anomaly detection is more "hands-off" and data-aware. Instead of thinking of critical conditions to define, user can rely on catching anomalies that were not expected to happen. In other words, by setting up alerting rules, a user must know what to look for, ahead of time, while anomaly detection looks for any deviations from past behavior.
Practical use case is to put anomaly score generated by vmanomaly into alerting rules with some threshold. Practical use case is to put anomaly score generated by vmanomaly into alerting rules with some threshold.
@ -43,9 +43,10 @@ Practical use case is to put anomaly score generated by vmanomaly into alerting
- Explore data for analysis in [Grafana](https://grafana.com/). - Explore data for analysis in [Grafana](https://grafana.com/).
- Explore vmanomaly results. - Explore vmanomaly results.
- Explore vmalert alerts - Explore vmalert alerts
_____________________________ _____________________________
## Data to analyze ## 4. Data to analyze
Let's talk about data used for anomaly detection in this tutorial. Let's talk about data used for anomaly detection in this tutorial.
We are going to collect our own CPU usage data with [Node Exporter](https://prometheus.io/docs/guides/node-exporter/) into the VictoriaMetrics database. We are going to collect our own CPU usage data with [Node Exporter](https://prometheus.io/docs/guides/node-exporter/) into the VictoriaMetrics database.
@ -73,9 +74,10 @@ Here is how this query may look like in Grafana:
![node_cpu_rate_graph](guide-vmanomaly-node-cpu-rate-graph.png "node_cpu_rate_graph") ![node_cpu_rate_graph](guide-vmanomaly-node-cpu-rate-graph.png "node_cpu_rate_graph")
This query result will generate 8 time series per each cpu, and we will use them as an input for our VM Anomaly Detection. vmanomaly will start learning configured model type separately for each of the time series. This query result will generate 8 time series per each cpu, and we will use them as an input for our VM Anomaly Detection. vmanomaly will start learning configured model type separately for each of the time series.
______________________________ ______________________________
## vmanomaly configuration and parameter description ## 5. vmanomaly configuration and parameter description
**Parameter description**: **Parameter description**:
There are 4 main sections in config file: There are 4 main sections in config file:
@ -141,7 +143,7 @@ writer:
</div> </div>
_____________________________________________ _____________________________________________
## vmanomaly output ## 6. vmanomaly output
As the result of running vmanomaly, it produces the following metrics: As the result of running vmanomaly, it produces the following metrics:
- `anomaly_score` - the main one. Ideally, if it is between 0.0 and 1.0 it is considered to be a non-anomalous value. If it is greater than 1.0, it is considered an anomaly (but you can reconfigure that in alerting config, of course), - `anomaly_score` - the main one. Ideally, if it is between 0.0 and 1.0 it is considered to be a non-anomalous value. If it is greater than 1.0, it is considered an anomaly (but you can reconfigure that in alerting config, of course),
- `yhat` - predicted expected value, - `yhat` - predicted expected value,
@ -154,7 +156,7 @@ Here is an example of how output metric will be written into VictoriaMetrics:
____________________________________________ ____________________________________________
## vmalert configuration ## 7. vmalert configuration
Here we provide an example of the config for vmalert `vmalert_config.yml`. Here we provide an example of the config for vmalert `vmalert_config.yml`.
<div class="with-copy" markdown="1"> <div class="with-copy" markdown="1">
@ -176,7 +178,7 @@ groups:
In the query expression we need to put a condition on the generated anomaly scores. Usually if the anomaly score is between 0.0 and 1.0, the analyzed value is not abnormal. The more anomaly score exceeded 1 the more our model is sure that value is an anomaly. In the query expression we need to put a condition on the generated anomaly scores. Usually if the anomaly score is between 0.0 and 1.0, the analyzed value is not abnormal. The more anomaly score exceeded 1 the more our model is sure that value is an anomaly.
You can choose your threshold value that you consider reasonable based on the anomaly score metric, generated by vmanomaly. One of the best ways is to estimate it visually, by plotting the `anomaly_score` metric, along with predicted "expected" range of `yhat_lower` and `yhat_upper`. Later in this tutorial we will show an example You can choose your threshold value that you consider reasonable based on the anomaly score metric, generated by vmanomaly. One of the best ways is to estimate it visually, by plotting the `anomaly_score` metric, along with predicted "expected" range of `yhat_lower` and `yhat_upper`. Later in this tutorial we will show an example
____________________________________________ ____________________________________________
## Docker Compose configuration ## 8. Docker Compose configuration
Now we are going to configure the `docker-compose.yml` file to run all needed services. Now we are going to configure the `docker-compose.yml` file to run all needed services.
Here are all services we are going to run: Here are all services we are going to run:
@ -375,7 +377,7 @@ docker-compose up -d
___________________________________________________________ ___________________________________________________________
## Model results ## 9. Model results
To look at model results we need to go to grafana on the `localhost:3000`. Data To look at model results we need to go to grafana on the `localhost:3000`. Data
vmanomaly need some time to generate more data to visualize. vmanomaly need some time to generate more data to visualize.
Let's investigate model output visualization in Grafana. Let's investigate model output visualization in Grafana.
@ -410,5 +412,5 @@ On the page `http://localhost:8880/vmalert/groups` you can find our configured A
According to the rule configured for vmalert we will see Alert when anomaly score exceed 1. You will see an alert on Alert tab. `http://localhost:8880/vmalert/alerts` According to the rule configured for vmalert we will see Alert when anomaly score exceed 1. You will see an alert on Alert tab. `http://localhost:8880/vmalert/alerts`
![alerts](guide-vmanomaly-alerts-firing.png "alerts firing") ![alerts](guide-vmanomaly-alerts-firing.png "alerts firing")
## Conclusion ## 10. Conclusion
Now we know how to set up Victoria Metric Anomaly Detection tool and use it together with vmalert. We also discovered core vmanomaly generated metrics and behaviour. Now we know how to set up Victoria Metric Anomaly Detection tool and use it together with vmalert. We also discovered core vmanomaly generated metrics and behaviour.

View file

@ -26,8 +26,9 @@ Operator introduces the following custom resources:
* [VMAlertmanager](#vmalertmanager) * [VMAlertmanager](#vmalertmanager)
* [VMAlertmanagerConfig](#vmalertmanagerconfig) * [VMAlertmanagerConfig](#vmalertmanagerconfig)
* [VMRule](#vmrule) * [VMRule](#vmrule)
* [VMPrometheusConverter](#vmprometheusconverter)
* [VMProbe](#vmprobe) * [VMProbe](#vmprobe)
* [VMNodeScrape](#vmodescrape) * [VMNodeScrape](#vmnodescrape)
* [VMStaticScrape](#vmstaticscrape) * [VMStaticScrape](#vmstaticscrape)
* [VMAuth](#vmauth) * [VMAuth](#vmauth)
* [VMUser](#vmuser) * [VMUser](#vmuser)

View file

@ -509,7 +509,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
# match is an optional filter for incoming samples to aggregate. # match is an optional filter for incoming samples to aggregate.
# It can contain arbitrary Prometheus series selector # It can contain arbitrary Prometheus series selector
# according to https://docs.victoriametrics.com/keyConcepts.html#filtering . # according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
# If match is missing, then all the incoming samples are aggregated. # If match isn't set, then all the incoming samples are aggregated.
- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}' - match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
# interval is the interval for the aggregation. # interval is the interval for the aggregation.
@ -545,3 +545,16 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
The file can contain multiple aggregation configs. The aggregation is performed independently The file can contain multiple aggregation configs. The aggregation is performed independently
per each specified config entry. per each specified config entry.
### Configuration update
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
support the following approaches for hot reloading stream aggregation configs from `-remoteWrite.streamAggr.config` and `-streamAggr.config`:
* By sending `SIGHUP` signal to `vmagent` or `victoria-metrics` process:
```console
kill -SIGHUP `pidof vmagent`
```
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).

View file

@ -288,7 +288,8 @@ curl http://<vmselect>:8481/select/0/prometheus/api/v1/labels
</div> </div>
By default VictoriaMetrics returns labels seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. By default VictoriaMetrics returns labels seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args.
The specified `start..end` time range is rounded to day granularity because of performance optimization concerns.
Additional information: Additional information:
* [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage)
@ -317,7 +318,8 @@ curl http://<vmselect>:8481/select/0/prometheus/api/v1/label/job/values
</div> </div>
By default VictoriaMetrics returns label values seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. By default VictoriaMetrics returns labels values seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args.
The specified `start..end` time range is rounded to day granularity because of performance optimization concerns.
Additional information: Additional information:
* [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage)
@ -402,7 +404,8 @@ curl http://<vmselect>:8481/select/0/prometheus/api/v1/series -d 'match[]=vm_htt
</div> </div>
By default VictoriaMetrics returns time series seen during the last 5 minutes. An arbitrary time range can be set via `start` and `end` query args. By default VictoriaMetrics returns time series seen during the last day starting at 00:00 UTC. An arbitrary time range can be set via `start` and `end` query args.
The specified `start..end` time range is rounded to day granularity because of performance optimization concerns.
Additional information: Additional information:
* [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage) * [Prometheus querying API usage](https://docs.victoriametrics.com/#prometheus-querying-api-usage)

View file

@ -108,7 +108,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
`vmagent` should be restarted in order to update config options set via command-line args. `vmagent` should be restarted in order to update config options set via command-line args.
`vmagent` supports multiple approaches for reloading configs from updated config files such as `vmagent` supports multiple approaches for reloading configs from updated config files such as
`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`: `-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
* Sending `SIGHUP` signal to `vmagent` process: * Sending `SIGHUP` signal to `vmagent` process:
@ -1190,7 +1190,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-denyQueryTracing -denyQueryTracing
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
-dryRun -dryRun
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
-enableTCP6 -enableTCP6
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
-envflag.enable -envflag.enable
@ -1523,7 +1523,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
-remoteWrite.relabelConfig string -remoteWrite.relabelConfig string
Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling Optional path to file with relabeling configs, which are applied to all the metrics before sending them to -remoteWrite.url. See also -remoteWrite.urlRelabelConfig. The path can point either to local file or to http url. See https://docs.victoriametrics.com/vmagent.html#relabeling
-remoteWrite.keepDanglingQueues -remoteWrite.keepDanglingQueues
Keep persistent queues contents in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on. Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. Useful when -remoteWrite.url is changed temporarily and persistent queue files will be needed later on.
-remoteWrite.roundDigits array -remoteWrite.roundDigits array
Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics Round metric values to this number of decimal digits after the point before writing them to remote storage. Examples: -remoteWrite.roundDigits=2 would round 1.236 to 1.24, while -remoteWrite.roundDigits=-1 would round 126.78 to 130. By default digits rounding is disabled. Set it to 100 for disabling it for a particular remote storage. This option may be used for improving data compression for the stored metrics
Supports array of values separated by comma or specified via multiple flags. Supports array of values separated by comma or specified via multiple flags.

View file

@ -785,7 +785,25 @@ To avoid such situation try to filter out VM process metrics via `--vm-native-fi
4. `vmctl` doesn't provide relabeling or other types of labels management in this mode. 4. `vmctl` doesn't provide relabeling or other types of labels management in this mode.
Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375). Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/vmctl/issues/4#issuecomment-683424375).
5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format) 5. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
and specify `accountID` param. and specify `accountID` param. Example formats:
```console
# Migrating from cluster to single
--vm-native-src-addr=http://<src-vmselect>:8481/select/0/prometheus
--vm-native-dst-addr=http://<dst-vmsingle>:8428
# Migrating from single to cluster
--vm-native-src-addr=http://<src-vmsingle>:8428
--vm-native-src-addr=http://<dst-vminsert>:8480/insert/0/prometheus
# Migrating single to single
--vm-native-src-addr=http://<src-vmsingle>:8428
--vm-native-dst-addr=http://<dst-vmsingle>:8428
# Migrating cluster to cluster
--vm-native-src-addr=http://<src-vmselect>:8481/select/0/prometheus
--vm-native-dst-addr=http://<dst-vminsert>:8480/insert/0/prometheus
```
6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps. 6. When migrating large volumes of data it might be useful to use `--vm-native-step-interval` flag to split single process into smaller steps.
7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results. 7. `vmctl` supports `--vm-concurrency` which controls the number of concurrent workers that process the input from source query results.
Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according Please note that each import request can load up to a single vCPU core on VictoriaMetrics. So try to set it according

View file

@ -59,16 +59,19 @@ func NewArrayBytes(name, description string) *ArrayBytes {
// -foo=value1 -foo=value2 // -foo=value1 -foo=value2
// -foo=value1,value2 // -foo=value1,value2
// //
// Flag values may be quoted. For instance, the following arg creates an array of ("a", "b, c") items: // Each flag value may contain commas inside single quotes, double quotes, [], () or {} braces.
// For example, -foo=[a,b,c] defines a single command-line flag with `[a,b,c]` value.
// //
// -foo='a,"b, c"' // Flag values may be quoted. For instance, the following arg creates an array of ("a", "b,c") items:
//
// -foo='a,"b,c"'
type ArrayString []string type ArrayString []string
// String implements flag.Value interface // String implements flag.Value interface
func (a *ArrayString) String() string { func (a *ArrayString) String() string {
aEscaped := make([]string, len(*a)) aEscaped := make([]string, len(*a))
for i, v := range *a { for i, v := range *a {
if strings.ContainsAny(v, `", `+"\n") { if strings.ContainsAny(v, `,'"{[(`+"\n") {
v = fmt.Sprintf("%q", v) v = fmt.Sprintf("%q", v)
} }
aEscaped[i] = v aEscaped[i] = v
@ -94,55 +97,105 @@ func parseArrayValues(s string) []string {
if len(tail) == 0 { if len(tail) == 0 {
return values return values
} }
if tail[0] == ',' {
tail = tail[1:]
}
s = tail s = tail
if s[0] == ',' {
s = s[1:]
}
} }
} }
var closeQuotes = map[byte]byte{
'"': '"',
'\'': '\'',
'[': ']',
'{': '}',
'(': ')',
}
func getNextArrayValue(s string) (string, string) { func getNextArrayValue(s string) (string, string) {
if len(s) == 0 { v, tail := getNextArrayValueMaybeQuoted(s)
return "", "" if strings.HasPrefix(v, `"`) && strings.HasSuffix(v, `"`) {
vUnquoted, err := strconv.Unquote(v)
if err == nil {
return vUnquoted, tail
} }
if s[0] != '"' { v = v[1 : len(v)-1]
// Fast path - unquoted string v = strings.ReplaceAll(v, `\"`, `"`)
n := strings.IndexByte(s, ',') v = strings.ReplaceAll(v, `\\`, `\`)
return v, tail
}
if strings.HasPrefix(v, `'`) && strings.HasSuffix(v, `'`) {
v = v[1 : len(v)-1]
v = strings.ReplaceAll(v, `\'`, "'")
v = strings.ReplaceAll(v, `\\`, `\`)
return v, tail
}
return v, tail
}
func getNextArrayValueMaybeQuoted(s string) (string, string) {
idx := 0
for {
n := strings.IndexAny(s[idx:], `,"'[{(`)
if n < 0 { if n < 0 {
// The last item // The last item
return s, "" return s, ""
} }
return s[:n], s[n:] idx += n
ch := s[idx]
if ch == ',' {
// The next item
return s[:idx], s[idx:]
} }
idx++
m := indexCloseQuote(s[idx:], closeQuotes[ch])
idx += m
}
}
// Find the end of quoted string func indexCloseQuote(s string, closeQuote byte) int {
end := 1 if closeQuote == '"' || closeQuote == '\'' {
ss := s[1:] idx := 0
for { for {
n := strings.IndexByte(ss, '"') n := strings.IndexByte(s[idx:], closeQuote)
if n < 0 { if n < 0 {
// Cannot find trailing quote. Return the whole string till the end. return 0
return s, ""
} }
end += n + 1 idx += n
// Verify whether the trailing quote is escaped with backslash. if n := getTrailingBackslashesCount(s[:idx]); n%2 == 1 {
backslashes := 0 // The quote is escaped with backslash. Skip it
for n > backslashes && ss[n-backslashes-1] == '\\' { idx++
backslashes++ continue
} }
if backslashes&1 == 0 { return idx + 1
// The trailing quote isn't escaped.
break
} }
// The trailing quote is escaped. Continue searching for the next quote.
ss = ss[n+1:]
} }
v := s[:end] idx := 0
vUnquoted, err := strconv.Unquote(v) for {
if err == nil { n := strings.IndexAny(s[idx:], `"'[{()}]`)
v = vUnquoted if n < 0 {
return 0
} }
return v, s[end:] idx += n
ch := s[idx]
if ch == closeQuote {
return idx + 1
}
idx++
m := indexCloseQuote(s[idx:], closeQuotes[ch])
if m == 0 {
return 0
}
idx += m
}
}
func getTrailingBackslashesCount(s string) int {
n := len(s)
for n > 0 && s[n-1] == '\\' {
n--
}
return len(s) - n
} }
// GetOptionalArg returns optional arg under the given argIdx. // GetOptionalArg returns optional arg under the given argIdx.

View file

@ -53,15 +53,54 @@ func TestArrayString_Set(t *testing.T) {
t.Fatalf("unexpected values parsed;\ngot\n%q\nwant\n%q", a, expectedValues) t.Fatalf("unexpected values parsed;\ngot\n%q\nwant\n%q", a, expectedValues)
} }
} }
// Zero args
f("", nil) f("", nil)
// Single arg
f(`foo`, []string{`foo`}) f(`foo`, []string{`foo`})
f(`foo,b ar,baz`, []string{`foo`, `b ar`, `baz`}) f(`fo"o`, []string{`fo"o`})
f(`foo,b\"'ar,"baz,d`, []string{`foo`, `b\"'ar`, `"baz,d`}) f(`fo'o`, []string{`fo'o`})
f(`,foo,,ba"r,`, []string{``, `foo`, ``, `ba"r`, ``}) f(`fo{o`, []string{`fo{o`})
f(`fo[o`, []string{`fo[o`})
f(`fo(o`, []string{`fo(o`})
// Single arg with Prometheus label filters
f(`foo{bar="baz",x="y"}`, []string{`foo{bar="baz",x="y"}`})
f(`foo{bar="ba}z",x="y"}`, []string{`foo{bar="ba}z",x="y"}`})
f(`foo{bar='baz',x="y"}`, []string{`foo{bar='baz',x="y"}`})
f(`foo{bar='baz',x='y'}`, []string{`foo{bar='baz',x='y'}`})
f(`foo{bar='ba}z',x='y'}`, []string{`foo{bar='ba}z',x='y'}`})
f(`{foo="ba[r",baz='a'}`, []string{`{foo="ba[r",baz='a'}`})
// Single arg with JSON
f(`[1,2,3]`, []string{`[1,2,3]`})
f(`{"foo":"ba,r",baz:x}`, []string{`{"foo":"ba,r",baz:x}`})
// Single quoted arg
f(`"foo"`, []string{`foo`})
f(`"fo,'o"`, []string{`fo,'o`})
f(`"f\\o,\'\"o"`, []string{`f\o,\'"o`})
f(`"foo{bar='baz',x='y'}"`, []string{`foo{bar='baz',x='y'}`})
f(`'foo'`, []string{`foo`})
f(`'fo,"o'`, []string{`fo,"o`})
f(`'f\\o,\'\"o'`, []string{`f\o,'\"o`})
f(`'foo{bar="baz",x="y"}'`, []string{`foo{bar="baz",x="y"}`})
// Multiple args
f(`foo,bar,baz`, []string{`foo`, `bar`, `baz`})
f(`"foo",'bar',{[(ba'",z"`, []string{`foo`, `bar`, `{[(ba'",z"`})
f(`foo,b"'ar,"baz,d`, []string{`foo`, `b"'ar,"baz`, `d`})
f(`{foo="b,ar"},baz{x="y",z="d"}`, []string{`{foo="b,ar"}`, `baz{x="y",z="d"}`})
// Empty args
f(`""`, []string{``}) f(`""`, []string{``})
f(`"foo,b\nar"`, []string{`foo,b` + "\n" + `ar`}) f(`''`, []string{``})
f(`"foo","bar",baz`, []string{`foo`, `bar`, `baz`}) f(`,`, []string{``, ``})
f(`,fo,"\"b, a'\\",,r,`, []string{``, `fo`, `"b, a'\`, ``, `r`, ``}) f(`,foo,,ba"r,`, []string{``, `foo`, ``, `ba"r`, ``})
// Special chars inside double quotes
f(`"foo,b\nar"`, []string{"foo,b\nar"})
f(`"foo\x23bar"`, []string{"foo\x23bar"})
} }
func TestArrayString_GetOptionalArg(t *testing.T) { func TestArrayString_GetOptionalArg(t *testing.T) {
@ -100,6 +139,7 @@ func TestArrayString_String(t *testing.T) {
f(",foo,") f(",foo,")
f(`", foo","b\"ar",`) f(`", foo","b\"ar",`)
f(`,"\nfoo\\",bar`) f(`,"\nfoo\\",bar`)
f(`"foo{bar=~\"baz\",a!=\"b\"}","{a='b,{[(c'}"`)
} }
func TestArrayDuration(t *testing.T) { func TestArrayDuration(t *testing.T) {

View file

@ -426,6 +426,12 @@ func ReadFileOrHTTP(path string) ([]byte, error) {
} }
data, err := io.ReadAll(resp.Body) data, err := io.ReadAll(resp.Body)
_ = resp.Body.Close() _ = resp.Body.Close()
if resp.StatusCode != http.StatusOK {
if len(data) > 4*1024 {
data = data[:4*1024]
}
return nil, fmt.Errorf("unexpected status code when fetching %q: %d, expecting %d; response: %q", path, resp.StatusCode, http.StatusOK, data)
}
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot read %q: %s", path, err) return nil, fmt.Errorf("cannot read %q: %s", path, err)
} }

View file

@ -50,7 +50,7 @@ var (
// CheckConfig checks -promscrape.config for errors and unsupported options. // CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig() error { func CheckConfig() error {
if *promscrapeConfigFile == "" { if *promscrapeConfigFile == "" {
return fmt.Errorf("missing -promscrape.config option") return nil
} }
_, _, err := loadConfig(*promscrapeConfigFile) _, _, err := loadConfig(*promscrapeConfigFile)
return err return err

View file

@ -1,6 +1,7 @@
package streamaggr package streamaggr
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"sort" "sort"
@ -63,7 +64,7 @@ func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) { func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
var cfgs []*Config var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, err return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
} }
return NewAggregators(cfgs, pushFunc, dedupInterval) return NewAggregators(cfgs, pushFunc, dedupInterval)
} }
@ -130,6 +131,10 @@ type Config struct {
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data. // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
type Aggregators struct { type Aggregators struct {
as []*aggregator as []*aggregator
// configData contains marshaled configs passed to NewAggregators().
// It is used in Equal() for comparing Aggregators.
configData []byte
} }
// NewAggregators creates Aggregators from the given cfgs. // NewAggregators creates Aggregators from the given cfgs.
@ -148,12 +153,21 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
for i, cfg := range cfgs { for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, dedupInterval) a, err := newAggregator(cfg, pushFunc, dedupInterval)
if err != nil { if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
a.MustStop()
}
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
} }
as[i] = a as[i] = a
} }
configData, err := json.Marshal(cfgs)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
return &Aggregators{ return &Aggregators{
as: as, as: as,
configData: configData,
}, nil }, nil
} }
@ -167,6 +181,14 @@ func (a *Aggregators) MustStop() {
} }
} }
// Equal returns true if a and b are initialized from identical configs.
func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
return string(a.configData) == string(b.configData)
}
// Push pushes tss to a. // Push pushes tss to a.
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) { func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
if a == nil { if a == nil {
@ -411,7 +433,6 @@ func (a *aggregator) dedupFlush() {
skipAggrSuffix: true, skipAggrSuffix: true,
} }
a.dedupAggr.appendSeriesForFlush(ctx) a.dedupAggr.appendSeriesForFlush(ctx)
logger.Errorf("series after dedup: %v", ctx.tss)
a.push(ctx.tss) a.push(ctx.tss)
} }
@ -450,6 +471,14 @@ func (a *aggregator) flush() {
func (a *aggregator) MustStop() { func (a *aggregator) MustStop() {
close(a.stopCh) close(a.stopCh)
a.wg.Wait() a.wg.Wait()
// Flush the remaining data from the last interval if needed.
flushConcurrencyCh <- struct{}{}
if a.dedupAggr != nil {
a.dedupFlush()
}
a.flush()
<-flushConcurrencyCh
} }
// Push pushes tss to a. // Push pushes tss to a.

View file

@ -118,6 +118,45 @@ func TestAggregatorsFailure(t *testing.T) {
`) `)
} }
func TestAggregatorsEqual(t *testing.T) {
f := func(a, b string, expectedResult bool) {
t.Helper()
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
result := aa.Equal(ab)
if result != expectedResult {
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
}
}
f("", "", true)
f(`
- outputs: [total]
interval: 5m
`, ``, false)
f(`
- outputs: [total]
interval: 5m
`, `
- outputs: [total]
interval: 5m
`, true)
f(`
- outputs: [total]
interval: 3m
`, `
- outputs: [total]
interval: 5m
`, false)
}
func TestAggregatorsSuccess(t *testing.T) { func TestAggregatorsSuccess(t *testing.T) {
f := func(config, inputMetrics, outputMetricsExpected string) { f := func(config, inputMetrics, outputMetricsExpected string) {
t.Helper() t.Helper()
@ -145,11 +184,6 @@ func TestAggregatorsSuccess(t *testing.T) {
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)
a.Push(tssInput) a.Push(tssInput)
if a != nil {
for _, aggr := range a.as {
aggr.flush()
}
}
a.MustStop() a.MustStop()
// Verify the tssOutput contains the expected metrics // Verify the tssOutput contains the expected metrics