mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/streamaggr: follow-up for ff72ca14b9
- Make sure that the last successfully loaded config is used on hot-reload failure - Properly cleanup resources occupied by already initialized aggregators when the current aggregator fails to be initialized - Expose distinct vmagent_streamaggr_config_reload* metrics per each -remoteWrite.streamAggr.config This should simplify monitoring and debugging failed reloads - Remove race condition at app/vminsert/common.MustStopStreamAggr when calling sa.MustStop() while sa could be in use at realoadSaConfig() - Remove lib/streamaggr.aggregator.hasState global variable, since it may negatively impact scalability on system with big number of CPU cores at hasState.Store(true) call inside aggregator.Push(). - Remove fine-grained aggregator reload - reload all the aggregators on config change instead. This simplifies the code a bit. The fine-grained aggregator reload may be returned back if there will be demand from real users for it. - Check -relabelConfig and -streamAggr.config files when single-node VictoriaMetrics runs with -dryRun flag - Return back accidentally removed changelog for v1.87.4 at docs/CHANGELOG.md Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639
This commit is contained in:
parent
a62f3034db
commit
dad13c0a91
13 changed files with 223 additions and 451 deletions
|
@ -104,7 +104,7 @@ additionally to pull-based Prometheus-compatible targets' scraping:
|
|||
|
||||
`vmagent` should be restarted in order to update config options set via command-line args.
|
||||
`vmagent` supports multiple approaches for reloading configs from updated config files such as
|
||||
`-promscrape.config`, `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig`:
|
||||
`-promscrape.config`, `-remoteWrite.relabelConfig`, `-remoteWrite.urlRelabelConfig` and `-remoteWrite.streamAggr.config`:
|
||||
|
||||
* Sending `SIGHUP` signal to `vmagent` process:
|
||||
|
||||
|
@ -1186,7 +1186,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-denyQueryTracing
|
||||
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
|
||||
-dryRun
|
||||
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
|
||||
Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
|
|
|
@ -67,7 +67,7 @@ var (
|
|||
opentsdbHTTPUseProxyProtocol = flag.Bool("opentsdbHTTPListenAddr.useProxyProtocol", false, "Whether to use proxy protocol for connections accepted "+
|
||||
"at -opentsdbHTTPListenAddr . See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt")
|
||||
configAuthKey = flag.String("configAuthKey", "", "Authorization key for accessing /config page. It must be passed via authKey query arg")
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check only config files without running vmagent. The following files are checked: "+
|
||||
dryRun = flag.Bool("dryRun", false, "Whether to check config files without running vmagent. The following files are checked: "+
|
||||
"-promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . "+
|
||||
"Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag")
|
||||
)
|
||||
|
@ -103,13 +103,13 @@ func main() {
|
|||
return
|
||||
}
|
||||
if *dryRun {
|
||||
if err := remotewrite.CheckRelabelConfigs(); err != nil {
|
||||
logger.Fatalf("error when checking relabel configs: %s", err)
|
||||
}
|
||||
if err := promscrape.CheckConfig(); err != nil {
|
||||
logger.Fatalf("error when checking -promscrape.config: %s", err)
|
||||
}
|
||||
if err := remotewrite.CheckStreamAggConfigs(); err != nil {
|
||||
if err := remotewrite.CheckRelabelConfigs(); err != nil {
|
||||
logger.Fatalf("error when checking relabel configs: %s", err)
|
||||
}
|
||||
if err := remotewrite.CheckStreamAggrConfigs(); err != nil {
|
||||
logger.Fatalf("error when checking -remoteWrite.streamAggr.config: %s", err)
|
||||
}
|
||||
logger.Infof("all the configs are ok; exiting with 0 status code")
|
||||
|
|
|
@ -65,6 +65,15 @@ var (
|
|||
"Excess series are logged and dropped. This can be useful for limiting series cardinality. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
|
||||
maxDailySeries = flag.Int("remoteWrite.maxDailySeries", 0, "The maximum number of unique series vmagent can send to remote storage systems during the last 24 hours. "+
|
||||
"Excess series are logged and dropped. This can be useful for limiting series churn rate. See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter")
|
||||
|
||||
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -87,9 +96,6 @@ func MultitenancyEnabled() bool {
|
|||
// Contains the current relabelConfigs.
|
||||
var allRelabelConfigs atomic.Value
|
||||
|
||||
// Contains the loader for stream aggregation configs.
|
||||
var saCfgLoader *saConfigsLoader
|
||||
|
||||
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
|
||||
// since it may lead to high memory usage due to big number of buffers.
|
||||
var maxQueues = cgroup.AvailableCPUs() * 16
|
||||
|
@ -152,15 +158,9 @@ func Init() {
|
|||
logger.Fatalf("cannot load relabel configs: %s", err)
|
||||
}
|
||||
allRelabelConfigs.Store(rcs)
|
||||
|
||||
relabelConfigSuccess.Set(1)
|
||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
saCfgLoader, err = newSaConfigsLoader(*streamAggrConfig)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot load stream aggregation config: %s", err)
|
||||
}
|
||||
|
||||
if len(*remoteWriteURLs) > 0 {
|
||||
rwctxsDefault = newRemoteWriteCtxs(nil, *remoteWriteURLs)
|
||||
}
|
||||
|
@ -172,46 +172,31 @@ func Init() {
|
|||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
case <-stopCh:
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
}
|
||||
relabelConfigReloads.Inc()
|
||||
logger.Infof("SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
rcs, err := loadRelabelConfigs()
|
||||
if err != nil {
|
||||
relabelConfigReloadErrors.Inc()
|
||||
relabelConfigSuccess.Set(0)
|
||||
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
|
||||
continue
|
||||
}
|
||||
allRelabelConfigs.Store(rcs)
|
||||
relabelConfigSuccess.Set(1)
|
||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("Successfully reloaded relabel configs")
|
||||
|
||||
logger.Infof("reloading stream agg configs pointed by -remoteWrite.streamAggr.config")
|
||||
err = saCfgLoader.reloadConfigs()
|
||||
if err != nil {
|
||||
logger.Errorf("Cannot reload stream aggregation configs: %s", err)
|
||||
}
|
||||
if len(*remoteWriteMultitenantURLs) > 0 {
|
||||
rwctxsMapLock.Lock()
|
||||
for _, rwctxs := range rwctxsMap {
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.reinitStreamAggr()
|
||||
}
|
||||
}
|
||||
rwctxsMapLock.Unlock()
|
||||
} else {
|
||||
for _, rwctx := range rwctxsDefault {
|
||||
rwctx.reinitStreamAggr()
|
||||
}
|
||||
}
|
||||
logger.Infof("Successfully reloaded stream aggregation configs")
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func reloadRelabelConfigs() {
|
||||
relabelConfigReloads.Inc()
|
||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
rcs, err := loadRelabelConfigs()
|
||||
if err != nil {
|
||||
relabelConfigReloadErrors.Inc()
|
||||
relabelConfigSuccess.Set(0)
|
||||
logger.Errorf("cannot reload relabel configs; preserving the previous configs; error: %s", err)
|
||||
return
|
||||
}
|
||||
allRelabelConfigs.Store(rcs)
|
||||
relabelConfigSuccess.Set(1)
|
||||
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("successfully reloaded relabel configs")
|
||||
}
|
||||
|
||||
var (
|
||||
relabelConfigReloads = metrics.NewCounter(`vmagent_relabel_config_reloads_total`)
|
||||
relabelConfigReloadErrors = metrics.NewCounter(`vmagent_relabel_config_reloads_errors_total`)
|
||||
|
@ -219,6 +204,24 @@ var (
|
|||
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
|
||||
)
|
||||
|
||||
func reloadStreamAggrConfigs() {
|
||||
if len(*remoteWriteMultitenantURLs) > 0 {
|
||||
rwctxsMapLock.Lock()
|
||||
for _, rwctxs := range rwctxsMap {
|
||||
reinitStreamAggr(rwctxs)
|
||||
}
|
||||
rwctxsMapLock.Unlock()
|
||||
} else {
|
||||
reinitStreamAggr(rwctxsDefault)
|
||||
}
|
||||
}
|
||||
|
||||
func reinitStreamAggr(rwctxs []*remoteWriteCtx) {
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.reinitStreamAggr()
|
||||
}
|
||||
}
|
||||
|
||||
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
||||
if len(urls) == 0 {
|
||||
logger.Panicf("BUG: urls must be non-empty")
|
||||
|
@ -284,14 +287,14 @@ func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
|
|||
return rwctxs
|
||||
}
|
||||
|
||||
var stopCh = make(chan struct{})
|
||||
var configReloaderStopCh = make(chan struct{})
|
||||
var configReloaderWG sync.WaitGroup
|
||||
|
||||
// Stop stops remotewrite.
|
||||
//
|
||||
// It is expected that nobody calls Push during and after the call to this func.
|
||||
func Stop() {
|
||||
close(stopCh)
|
||||
close(configReloaderStopCh)
|
||||
configReloaderWG.Wait()
|
||||
|
||||
for _, rwctx := range rwctxsDefault {
|
||||
|
@ -506,8 +509,7 @@ type remoteWriteCtx struct {
|
|||
fq *persistentqueue.FastQueue
|
||||
c *client
|
||||
|
||||
sas *streamaggr.Aggregators
|
||||
saHash uint64
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
streamAggrKeepInput bool
|
||||
|
||||
pss []*pendingSeries
|
||||
|
@ -567,17 +569,17 @@ func newRemoteWriteCtx(argIdx int, at *auth.Token, remoteWriteURL *url.URL, maxI
|
|||
}
|
||||
|
||||
// Initialize sas
|
||||
saCfg, saHash := saCfgLoader.getCurrentConfig(argIdx)
|
||||
if len(saCfg) > 0 {
|
||||
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
|
||||
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
|
||||
if sasFile != "" {
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(argIdx, 0)
|
||||
sas, err := streamaggr.NewAggregators(saCfg, rwctx.pushInternal, dedupInterval)
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
|
||||
if err != nil {
|
||||
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggrFile=%q: %s", sasFile, err)
|
||||
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
|
||||
}
|
||||
rwctx.sas = sas
|
||||
rwctx.saHash = saHash
|
||||
rwctx.sas.Store(sas)
|
||||
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
return rwctx
|
||||
|
@ -592,8 +594,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
|||
rwctx.fq.UnblockAllReaders()
|
||||
rwctx.c.MustStop()
|
||||
rwctx.c = nil
|
||||
rwctx.sas.MustStop()
|
||||
rwctx.sas = nil
|
||||
|
||||
sas := rwctx.sas.Swap(nil)
|
||||
sas.MustStop()
|
||||
|
||||
rwctx.fq.MustClose()
|
||||
rwctx.fq = nil
|
||||
|
||||
|
@ -624,8 +628,9 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) {
|
|||
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
|
||||
|
||||
// Apply stream aggregation if any
|
||||
rwctx.sas.Push(tss)
|
||||
if rwctx.sas == nil || rwctx.streamAggrKeepInput {
|
||||
sas := rwctx.sas.Load()
|
||||
sas.Push(tss)
|
||||
if sas == nil || rwctx.streamAggrKeepInput {
|
||||
// Push samples to the remote storage
|
||||
rwctx.pushInternal(tss)
|
||||
}
|
||||
|
@ -645,17 +650,33 @@ func (rwctx *remoteWriteCtx) pushInternal(tss []prompbmarshal.TimeSeries) {
|
|||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
|
||||
if rwctx.sas == nil {
|
||||
sas := rwctx.sas.Load()
|
||||
if sas == nil {
|
||||
// There is no stream aggregation for rwctx
|
||||
return
|
||||
}
|
||||
saCfg, saHash := saCfgLoader.getCurrentConfig(rwctx.idx)
|
||||
if rwctx.saHash == saHash {
|
||||
|
||||
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(rwctx.idx, 0)
|
||||
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternal, dedupInterval)
|
||||
if err != nil {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
|
||||
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
|
||||
return
|
||||
}
|
||||
if err := rwctx.sas.ReInitConfigs(saCfg); err != nil {
|
||||
logger.Errorf("Cannot apply stream aggregation configs %d: %s", rwctx.idx, err)
|
||||
if !sasNew.Equal(sas) {
|
||||
sasOld := rwctx.sas.Swap(sasNew)
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
|
||||
}
|
||||
rwctx.saHash = saHash
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
var tssRelabelPool = &sync.Pool{
|
||||
|
@ -672,3 +693,20 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
|
|||
}
|
||||
return rowsCount
|
||||
}
|
||||
|
||||
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
|
||||
func CheckStreamAggrConfigs() error {
|
||||
pushNoop := func(tss []prompbmarshal.TimeSeries) {}
|
||||
for idx, sasFile := range *streamAggrConfig {
|
||||
if sasFile == "" {
|
||||
continue
|
||||
}
|
||||
dedupInterval := streamAggrDedupInterval.GetOptionalArgOrDefault(idx, 0)
|
||||
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, dedupInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
|
||||
}
|
||||
sas.MustStop()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,118 +0,0 @@
|
|||
package remotewrite
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html . "+
|
||||
"See also -remoteWrite.streamAggr.keepInput and -remoteWrite.streamAggr.dedupInterval")
|
||||
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep input samples after the aggregation with -remoteWrite.streamAggr.config. "+
|
||||
"By default the input is dropped after the aggregation, so only the aggregate data is sent to the -remoteWrite.url. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation.html")
|
||||
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", "Input samples are de-duplicated with this interval before being aggregated. "+
|
||||
"Only the last sample per each time series per each interval is aggregated if the interval is greater than zero")
|
||||
)
|
||||
|
||||
var (
|
||||
saCfgReloads = metrics.NewCounter(`vmagent_streamaggr_config_reloads_total`)
|
||||
saCfgReloadErr = metrics.NewCounter(`vmagent_streamaggr_config_reloads_errors_total`)
|
||||
saCfgSuccess = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_successful`)
|
||||
saCfgTimestamp = metrics.NewCounter(`vmagent_streamaggr_config_last_reload_success_timestamp_seconds`)
|
||||
)
|
||||
|
||||
// saConfigRules - type alias for unmarshalled stream aggregation config
|
||||
type saConfigRules = []*streamaggr.Config
|
||||
|
||||
// saConfigsLoader loads stream aggregation configs from the given files.
|
||||
type saConfigsLoader struct {
|
||||
files []string
|
||||
configs atomic.Pointer[[]saConfig]
|
||||
}
|
||||
|
||||
// newSaConfigsLoader creates new saConfigsLoader for the given config files.
|
||||
func newSaConfigsLoader(configFiles []string) (*saConfigsLoader, error) {
|
||||
result := &saConfigsLoader{
|
||||
files: configFiles,
|
||||
}
|
||||
// Initial load of configs.
|
||||
if err := result.reloadConfigs(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// reloadConfigs reloads stream aggregation configs from the files given in constructor.
|
||||
func (r *saConfigsLoader) reloadConfigs() error {
|
||||
// Increment reloads counter if it is not the initial load.
|
||||
if r.configs.Load() != nil {
|
||||
saCfgReloads.Inc()
|
||||
}
|
||||
|
||||
// Load all configs from files.
|
||||
var configs = make([]saConfig, len(r.files))
|
||||
for i, path := range r.files {
|
||||
if len(path) == 0 {
|
||||
// Skip empty stream aggregation config.
|
||||
continue
|
||||
}
|
||||
rules, hash, err := streamaggr.LoadConfigsFromFile(path)
|
||||
if err != nil {
|
||||
saCfgSuccess.Set(0)
|
||||
saCfgReloadErr.Inc()
|
||||
return fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
|
||||
}
|
||||
configs[i] = saConfig{
|
||||
path: path,
|
||||
hash: hash,
|
||||
rules: rules,
|
||||
}
|
||||
}
|
||||
|
||||
// Update configs.
|
||||
r.configs.Store(&configs)
|
||||
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
return nil
|
||||
}
|
||||
|
||||
// getCurrentConfig returns the current stream aggregation config with the given idx.
|
||||
func (r *saConfigsLoader) getCurrentConfig(idx int) (saConfigRules, uint64) {
|
||||
all := r.configs.Load()
|
||||
if all == nil {
|
||||
return nil, 0
|
||||
}
|
||||
cfgs := *all
|
||||
if len(cfgs) == 0 {
|
||||
return nil, 0
|
||||
}
|
||||
if idx >= len(cfgs) {
|
||||
if len(cfgs) == 1 {
|
||||
cfg := cfgs[0]
|
||||
return cfg.rules, cfg.hash
|
||||
}
|
||||
return nil, 0
|
||||
}
|
||||
cfg := cfgs[idx]
|
||||
return cfg.rules, cfg.hash
|
||||
}
|
||||
|
||||
type saConfig struct {
|
||||
path string
|
||||
hash uint64
|
||||
rules saConfigRules
|
||||
}
|
||||
|
||||
// CheckStreamAggConfigs checks -remoteWrite.streamAggr.config.
|
||||
func CheckStreamAggConfigs() error {
|
||||
_, err := newSaConfigsLoader(*streamAggrConfig)
|
||||
return err
|
||||
}
|
|
@ -71,6 +71,12 @@ var (
|
|||
|
||||
var pcsGlobal atomic.Value
|
||||
|
||||
// CheckRelabelConfig checks config pointed by -relabelConfig
|
||||
func CheckRelabelConfig() error {
|
||||
_, err := loadRelabelConfig()
|
||||
return err
|
||||
}
|
||||
|
||||
func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) {
|
||||
if len(*relabelConfig) == 0 {
|
||||
return nil, nil
|
||||
|
|
|
@ -26,7 +26,8 @@ created by v1.90.0 or newer versions. The solution is to upgrade to v1.90.0 or n
|
|||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add `-kafka.consumer.topic.concurrency` command-line flag. It controls the number of Kafka consumer workers to use by `vmagent`. It should eliminate the need to start multiple `vmagent` instances to improve data transfer rate. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1957).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for [Kafka producer and consumer](https://docs.victoriametrics.com/vmagent.html#kafka-integration) on `arm64` machines. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2271).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): delete unused buffered data at `-remoteWrite.tmpDataPath` directory when there is no matching `-remoteWrite.url` to send this data to. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4014).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for hot reload of stream aggregation configs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add the ability for hot reloading of [stream aggregation](https://docs.victoriametrics.com/stream-aggregation.html) configs. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html#configuration-update) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3639).
|
||||
* FEATURE: check the contents of `-relabelConfig` and `-streamAggr.config` files additionally to `-promscrape.config` when single-node VictoriaMetrics runs with `-dryRun` command-line flag. This aligns the behaviour of single-node VictoriaMetrics with [vmagent](https://docs.victoriametrics.com/vmagent.html) behaviour for `-dryRun` command-line flag.
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): automatically draw a heatmap graph when the query selects a single [histogram](https://docs.victoriametrics.com/keyConcepts.html#histogram). This simplifies analyzing histograms. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3384).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add support for drag'n'drop and paste from clipboard in the "Trace analyzer" page. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): hide messages longer than 3 lines in the trace. You can view the full message by clicking on the `show more` button. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3971).
|
||||
|
@ -138,6 +139,19 @@ Released at 2023-02-24
|
|||
* BUGFIX: properly parse timestamps in milliseconds when [ingesting data via OpenTSDB telnet put protocol](https://docs.victoriametrics.com/#sending-data-via-telnet-put-protocol). Previously timestamps in milliseconds were mistakenly multiplied by 1000. Thanks to @Droxenator for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3810).
|
||||
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): do not add extrapolated points outside the real points when using [interpolate()](https://docs.victoriametrics.com/MetricsQL.html#interpolate) function. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3816).
|
||||
|
||||
## [v1.87.4](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.4)
|
||||
|
||||
Released at 2023-03-25
|
||||
|
||||
**v1.87.x is a line of LTS releases (e.g. long-time support). It contains important up-to-date bugfixes.
|
||||
The v1.87.x line will be supported for at least 12 months since [v1.87.0](https://docs.victoriametrics.com/CHANGELOG.html#v1870) release**
|
||||
|
||||
* BUGFIX: prevent from slow [snapshot creating](https://docs.victoriametrics.com/#how-to-work-with-snapshots) under high data ingestion rate. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3551).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): suppress [proxy protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) parsing errors in case of `EOF`. Usually, the error is caused by health checks and is not a sign of an actual error.
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): fix snapshot not being deleted in case of error during backup. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2055).
|
||||
* BUGFIX: allow using dashes and dots in environment variables names referred in config files via `%{ENV-VAR.SYNTAX}`. See [these docs](https://docs.victoriametrics.com/#environment-variables) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3999).
|
||||
* BUGFIX: return back query performance scalability on hosts with big number of CPU cores. The scalability has been reduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3966).
|
||||
|
||||
## [v1.87.3](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.87.3)
|
||||
|
||||
Released at 2023-03-12
|
||||
|
|
|
@ -2194,7 +2194,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-dryRun
|
||||
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
|
||||
Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
|
|
|
@ -2197,7 +2197,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
|||
Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details. This flag is available only in VictoriaMetrics enterprise. See https://docs.victoriametrics.com/enterprise.html
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
-dryRun
|
||||
Whether to check only -promscrape.config and then exit. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
|
||||
Whether to check config files without running VictoriaMetrics. The following config files are checked: -promscrape.config, -relabelConfig and -streamAggr.config. Unknown config entries aren't allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse=false command-line flag
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
|
|
|
@ -509,7 +509,7 @@ at [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-
|
|||
# match is an optional filter for incoming samples to aggregate.
|
||||
# It can contain arbitrary Prometheus series selector
|
||||
# according to https://docs.victoriametrics.com/keyConcepts.html#filtering .
|
||||
# If match is missing, then all the incoming samples are aggregated.
|
||||
# If match isn't set, then all the incoming samples are aggregated.
|
||||
- match: 'http_request_duration_seconds_bucket{env=~"prod|staging"}'
|
||||
|
||||
# interval is the interval for the aggregation.
|
||||
|
@ -548,17 +548,13 @@ per each specified config entry.
|
|||
|
||||
### Configuration update
|
||||
|
||||
[vmagent](https://docs.victoriametrics.com/vmagent.html) and
|
||||
[single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html) support two
|
||||
approaches for reloading stream aggregation configs from updated config files such as
|
||||
`-remoteWrite.streamAggr.config` and `-streamAggr.config` without restart.
|
||||
[vmagent](https://docs.victoriametrics.com/vmagent.html) and [single-node VictoriaMetrics](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html)
|
||||
support the following approaches for hot reloading stream aggregation configs from `-remoteWrite.streamAggr.config` and `-streamAggr.config`:
|
||||
|
||||
* Sending `SIGHUP` signal to `vmagent` process:
|
||||
* By sending `SIGHUP` signal to `vmagent` or `victoria-metrics` process:
|
||||
|
||||
```console
|
||||
kill -SIGHUP `pidof vmagent`
|
||||
```
|
||||
|
||||
* Sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload`).
|
||||
|
||||
It will reset the aggregation state only for changed rules in the configuration files.
|
||||
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload).
|
||||
|
|
|
@ -1190,7 +1190,7 @@ See the docs at https://docs.victoriametrics.com/vmagent.html .
|
|||
-denyQueryTracing
|
||||
Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing
|
||||
-dryRun
|
||||
Whether to check only config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
|
||||
Whether to check config files without running vmagent. The following files are checked: -promscrape.config, -remoteWrite.relabelConfig, -remoteWrite.urlRelabelConfig, -remoteWrite.streamAggr.config . Unknown config entries aren't allowed in -promscrape.config by default. This can be changed by passing -promscrape.config.strictParse=false command-line flag
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP and UDP is used
|
||||
-envflag.enable
|
||||
|
|
|
@ -50,7 +50,7 @@ var (
|
|||
// CheckConfig checks -promscrape.config for errors and unsupported options.
|
||||
func CheckConfig() error {
|
||||
if *promscrapeConfigFile == "" {
|
||||
return fmt.Errorf("missing -promscrape.config option")
|
||||
return nil
|
||||
}
|
||||
_, _, err := loadConfig(*promscrapeConfigFile)
|
||||
return err
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -19,7 +18,6 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
|
@ -39,40 +37,22 @@ var supportedOutputs = []string{
|
|||
"quantiles(phi1, ..., phiN)",
|
||||
}
|
||||
|
||||
// ParseConfig loads array of stream aggregation configs from the given path.
|
||||
func ParseConfig(data []byte) ([]*Config, uint64, error) {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot parse stream aggregation config %w", err)
|
||||
}
|
||||
return cfgs, xxhash.Sum64(data), nil
|
||||
}
|
||||
|
||||
// LoadConfigsFromFile loads array of stream aggregation configs from the given path.
|
||||
func LoadConfigsFromFile(path string) ([]*Config, uint64, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot load stream aggregation config from %q: %w", path, err)
|
||||
}
|
||||
return ParseConfig(data)
|
||||
}
|
||||
|
||||
// LoadAggregatorsFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
|
||||
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
|
||||
//
|
||||
// If dedupInterval > 0, then the input samples are de-duplicated before being aggregated,
|
||||
// e.g. only the last sample per each time series per each dedupInterval is aggregated.
|
||||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, uint64, error) {
|
||||
cfgs, configHash, err := LoadConfigsFromFile(path)
|
||||
func LoadFromFile(path string, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
data, err := fs.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot load stream aggregation config: %w", err)
|
||||
return nil, fmt.Errorf("cannot load aggregators: %w", err)
|
||||
}
|
||||
as, err := NewAggregators(cfgs, pushFunc, dedupInterval)
|
||||
as, err := NewAggregatorsFromData(data, pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w", path, err)
|
||||
}
|
||||
return as, configHash, nil
|
||||
return as, nil
|
||||
}
|
||||
|
||||
// NewAggregatorsFromData initializes Aggregators from the given data and uses the given pushFunc for pushing the aggregated data.
|
||||
|
@ -84,7 +64,7 @@ func LoadAggregatorsFromFile(path string, pushFunc PushFunc, dedupInterval time.
|
|||
func NewAggregatorsFromData(data []byte, pushFunc PushFunc, dedupInterval time.Duration) (*Aggregators, error) {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
|
||||
}
|
||||
return NewAggregators(cfgs, pushFunc, dedupInterval)
|
||||
}
|
||||
|
@ -148,22 +128,13 @@ type Config struct {
|
|||
OutputRelabelConfigs []promrelabel.RelabelConfig `yaml:"output_relabel_configs,omitempty"`
|
||||
}
|
||||
|
||||
func (cfg *Config) hash() (uint64, error) {
|
||||
if cfg == nil {
|
||||
return 0, nil
|
||||
}
|
||||
data, err := json.Marshal(cfg)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot marshal stream aggregation rule %+v: %w", cfg, err)
|
||||
}
|
||||
return xxhash.Sum64(data), nil
|
||||
}
|
||||
|
||||
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregate data.
|
||||
type Aggregators struct {
|
||||
as atomic.Pointer[[]*aggregator]
|
||||
pushFunc PushFunc
|
||||
dedupInterval time.Duration
|
||||
as []*aggregator
|
||||
|
||||
// configData contains marshaled configs passed to NewAggregators().
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
}
|
||||
|
||||
// NewAggregators creates Aggregators from the given cfgs.
|
||||
|
@ -182,17 +153,22 @@ func NewAggregators(cfgs []*Config, pushFunc PushFunc, dedupInterval time.Durati
|
|||
for i, cfg := range cfgs {
|
||||
a, err := newAggregator(cfg, pushFunc, dedupInterval)
|
||||
if err != nil {
|
||||
// Stop already initialized aggregators before returning the error.
|
||||
for _, a := range as[:i] {
|
||||
a.MustStop()
|
||||
}
|
||||
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
}
|
||||
as[i] = a
|
||||
}
|
||||
result := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
dedupInterval: dedupInterval,
|
||||
configData, err := json.Marshal(cfgs)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
|
||||
}
|
||||
result.as.Store(&as)
|
||||
|
||||
return result, nil
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
configData: configData,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MustStop stops a.
|
||||
|
@ -200,84 +176,29 @@ func (a *Aggregators) MustStop() {
|
|||
if a == nil {
|
||||
return
|
||||
}
|
||||
for _, aggr := range *a.as.Load() {
|
||||
for _, aggr := range a.as {
|
||||
aggr.MustStop()
|
||||
}
|
||||
}
|
||||
|
||||
// Equal returns true if a and b are initialized from identical configs.
|
||||
func (a *Aggregators) Equal(b *Aggregators) bool {
|
||||
if a == nil || b == nil {
|
||||
return a == nil && b == nil
|
||||
}
|
||||
return string(a.configData) == string(b.configData)
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
func (a *Aggregators) Push(tss []prompbmarshal.TimeSeries) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
for _, aggr := range *a.as.Load() {
|
||||
for _, aggr := range a.as {
|
||||
aggr.Push(tss)
|
||||
}
|
||||
}
|
||||
|
||||
// ReInitConfigs reinits state of Aggregators a with the given new stream aggregation config
|
||||
func (a *Aggregators) ReInitConfigs(cfgs []*Config) error {
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
keys := make(map[uint64]struct{}) // set of all keys (configs and aggregators)
|
||||
cfgsMap := make(map[uint64]*Config) // map of config keys to their indices in cfgs
|
||||
aggrsMap := make(map[uint64]*aggregator) // map of aggregator keys to their indices in a.as
|
||||
|
||||
for _, cfg := range cfgs {
|
||||
key, err := cfg.hash()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to calculate hash for config '%+v': %w", cfg, err)
|
||||
}
|
||||
keys[key] = struct{}{}
|
||||
cfgsMap[key] = cfg
|
||||
}
|
||||
for _, aggr := range *a.as.Load() {
|
||||
keys[aggr.cfgHash] = struct{}{}
|
||||
aggrsMap[aggr.cfgHash] = aggr
|
||||
}
|
||||
|
||||
asNew := make([]*aggregator, 0, len(aggrsMap))
|
||||
asDel := make([]*aggregator, 0, len(aggrsMap))
|
||||
for key := range keys {
|
||||
cfg, hasCfg := cfgsMap[key]
|
||||
agg, hasAggr := aggrsMap[key]
|
||||
|
||||
// if config for aggregator was changed or removed
|
||||
// then we need to stop aggregator and remove it
|
||||
if !hasCfg && hasAggr {
|
||||
asDel = append(asDel, agg)
|
||||
continue
|
||||
}
|
||||
|
||||
// if there is no aggregator for config (new config),
|
||||
// then we need to create it
|
||||
if hasCfg && !hasAggr {
|
||||
newAgg, err := newAggregator(cfg, a.pushFunc, a.dedupInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot initialize aggregator for config '%+v': %w", cfg, err)
|
||||
}
|
||||
asNew = append(asNew, newAgg)
|
||||
continue
|
||||
}
|
||||
|
||||
// if aggregator config was not changed, then we can just keep it
|
||||
if hasCfg && hasAggr {
|
||||
asNew = append(asNew, agg)
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically replace aggregators array.
|
||||
a.as.Store(&asNew)
|
||||
// and stop old aggregators
|
||||
for _, aggr := range asDel {
|
||||
aggr.MustStop()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// aggregator aggregates input series according to the config passed to NewAggregator
|
||||
type aggregator struct {
|
||||
match *promrelabel.IfExpression
|
||||
|
@ -295,7 +216,6 @@ type aggregator struct {
|
|||
|
||||
// aggrStates contains aggregate states for the given outputs
|
||||
aggrStates []aggrState
|
||||
hasState atomic.Bool
|
||||
|
||||
pushFunc PushFunc
|
||||
|
||||
|
@ -304,8 +224,7 @@ type aggregator struct {
|
|||
// It contains the interval, labels in (by, without), plus output name.
|
||||
// For example, foo_bar metric name is transformed to foo_bar:1m_by_job
|
||||
// for `interval: 1m`, `by: [job]`
|
||||
suffix string
|
||||
cfgHash uint64
|
||||
suffix string
|
||||
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
|
@ -433,11 +352,6 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
|||
dedupAggr = newLastAggrState()
|
||||
}
|
||||
|
||||
cfgHash, err := cfg.hash()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot calculate config hash for config %+v: %w", cfg, err)
|
||||
}
|
||||
|
||||
// initialize the aggregator
|
||||
a := &aggregator{
|
||||
match: cfg.Match,
|
||||
|
@ -453,8 +367,7 @@ func newAggregator(cfg *Config, pushFunc PushFunc, dedupInterval time.Duration)
|
|||
aggrStates: aggrStates,
|
||||
pushFunc: pushFunc,
|
||||
|
||||
suffix: suffix,
|
||||
cfgHash: cfgHash,
|
||||
suffix: suffix,
|
||||
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
@ -521,8 +434,6 @@ func (a *aggregator) dedupFlush() {
|
|||
}
|
||||
a.dedupAggr.appendSeriesForFlush(ctx)
|
||||
a.push(ctx.tss)
|
||||
|
||||
a.hasState.Store(false)
|
||||
}
|
||||
|
||||
func (a *aggregator) flush() {
|
||||
|
@ -552,8 +463,6 @@ func (a *aggregator) flush() {
|
|||
// Push the output metrics.
|
||||
a.pushFunc(tss)
|
||||
}
|
||||
|
||||
a.hasState.Store(false)
|
||||
}
|
||||
|
||||
// MustStop stops the aggregator.
|
||||
|
@ -561,26 +470,19 @@ func (a *aggregator) flush() {
|
|||
// The aggregator stops pushing the aggregated metrics after this call.
|
||||
func (a *aggregator) MustStop() {
|
||||
close(a.stopCh)
|
||||
|
||||
if a.hasState.Load() {
|
||||
if a.dedupAggr != nil {
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.dedupFlush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
a.flush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
|
||||
a.wg.Wait()
|
||||
|
||||
// Flush the remaining data from the last interval if needed.
|
||||
flushConcurrencyCh <- struct{}{}
|
||||
if a.dedupAggr != nil {
|
||||
a.dedupFlush()
|
||||
}
|
||||
a.flush()
|
||||
<-flushConcurrencyCh
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
func (a *aggregator) Push(tss []prompbmarshal.TimeSeries) {
|
||||
a.hasState.Store(true)
|
||||
|
||||
if a.dedupAggr == nil {
|
||||
a.push(tss)
|
||||
return
|
||||
|
|
|
@ -118,6 +118,45 @@ func TestAggregatorsFailure(t *testing.T) {
|
|||
`)
|
||||
}
|
||||
|
||||
func TestAggregatorsEqual(t *testing.T) {
|
||||
f := func(a, b string, expectedResult bool) {
|
||||
t.Helper()
|
||||
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {}
|
||||
aa, err := NewAggregatorsFromData([]byte(a), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
ab, err := NewAggregatorsFromData([]byte(b), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
result := aa.Equal(ab)
|
||||
if result != expectedResult {
|
||||
t.Fatalf("unexpected result; got %v; want %v", result, expectedResult)
|
||||
}
|
||||
}
|
||||
f("", "", true)
|
||||
f(`
|
||||
- outputs: [total]
|
||||
interval: 5m
|
||||
`, ``, false)
|
||||
f(`
|
||||
- outputs: [total]
|
||||
interval: 5m
|
||||
`, `
|
||||
- outputs: [total]
|
||||
interval: 5m
|
||||
`, true)
|
||||
f(`
|
||||
- outputs: [total]
|
||||
interval: 3m
|
||||
`, `
|
||||
- outputs: [total]
|
||||
interval: 5m
|
||||
`, false)
|
||||
}
|
||||
|
||||
func TestAggregatorsSuccess(t *testing.T) {
|
||||
f := func(config, inputMetrics, outputMetricsExpected string) {
|
||||
t.Helper()
|
||||
|
@ -145,11 +184,6 @@ func TestAggregatorsSuccess(t *testing.T) {
|
|||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.flush()
|
||||
}
|
||||
}
|
||||
a.MustStop()
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
|
@ -671,7 +705,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
for _, aggr := range a.as {
|
||||
aggr.dedupFlush()
|
||||
aggr.flush()
|
||||
}
|
||||
|
@ -719,106 +753,6 @@ foo:1m_sum_samples{baz="qwe"} 10
|
|||
`)
|
||||
}
|
||||
|
||||
func TestAggregatorsReinit(t *testing.T) {
|
||||
f := func(config, newConfig, inputMetrics, outputMetricsExpected string) {
|
||||
t.Helper()
|
||||
|
||||
// Initialize Aggregators
|
||||
var tssOutput []prompbmarshal.TimeSeries
|
||||
var tssOutputLock sync.Mutex
|
||||
pushFunc := func(tss []prompbmarshal.TimeSeries) {
|
||||
tssOutputLock.Lock()
|
||||
for _, ts := range tss {
|
||||
labelsCopy := append([]prompbmarshal.Label{}, ts.Labels...)
|
||||
samplesCopy := append([]prompbmarshal.Sample{}, ts.Samples...)
|
||||
tssOutput = append(tssOutput, prompbmarshal.TimeSeries{
|
||||
Labels: labelsCopy,
|
||||
Samples: samplesCopy,
|
||||
})
|
||||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
|
||||
a, err := NewAggregatorsFromData([]byte(config), pushFunc, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
tssInput := mustParsePromMetrics(inputMetrics)
|
||||
a.Push(tssInput)
|
||||
|
||||
// Reinitialize Aggregators
|
||||
nc, _, err := ParseConfig([]byte(newConfig))
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse new config: %s", err)
|
||||
}
|
||||
err = a.ReInitConfigs(nc)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot reinit aggregators: %s", err)
|
||||
}
|
||||
|
||||
// Push the inputMetrics to Aggregators
|
||||
a.Push(tssInput)
|
||||
if a != nil {
|
||||
for _, aggr := range *a.as.Load() {
|
||||
aggr.flush()
|
||||
}
|
||||
}
|
||||
|
||||
a.MustStop()
|
||||
|
||||
// Verify the tssOutput contains the expected metrics
|
||||
tsStrings := make([]string, len(tssOutput))
|
||||
for i, ts := range tssOutput {
|
||||
tsStrings[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(tsStrings)
|
||||
outputMetrics := strings.Join(tsStrings, "")
|
||||
if outputMetrics != outputMetricsExpected {
|
||||
t.Fatalf("unexpected output metrics;\ngot\n%s\nwant\n%s", outputMetrics, outputMetricsExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar 567
|
||||
foo 234
|
||||
`, `bar:1m_count_samples 1
|
||||
bar:1m_sum_samples 567
|
||||
foo:1m_count_samples 2
|
||||
foo:1m_sum_samples 357
|
||||
`)
|
||||
|
||||
f(`
|
||||
- interval: 1m
|
||||
outputs: [total]
|
||||
- interval: 2m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
- interval: 1m
|
||||
outputs: [sum_samples]
|
||||
- interval: 2m
|
||||
outputs: [count_samples]
|
||||
`, `
|
||||
foo 123
|
||||
bar 567
|
||||
foo 234
|
||||
`, `bar:1m_sum_samples 567
|
||||
bar:1m_total 0
|
||||
bar:2m_count_samples 2
|
||||
foo:1m_sum_samples 357
|
||||
foo:1m_total 111
|
||||
foo:2m_count_samples 4
|
||||
`)
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompbmarshal.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
if len(ts.Samples) != 1 {
|
||||
|
|
Loading…
Reference in a new issue