lib/streamaggr: added hot-reload

This commit is contained in:
AndrewChubatiuk 2024-06-14 21:15:17 +03:00
parent 861852f262
commit 1a3ad91f09
No known key found for this signature in database
GPG key ID: 96D776CC99880667
7 changed files with 210 additions and 148 deletions

View file

@ -231,15 +231,23 @@ func Init() {
// Start config reloader. // Start config reloader.
configReloaderWG.Add(1) configReloaderWG.Add(1)
go func() { go func() {
var streamAggrConfigReloaderCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
streamAggrConfigReloaderCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done() defer configReloaderWG.Done()
for { for {
select { select {
case <-sighupCh: case <-sighupCh:
reloadRelabelConfigs()
reloadStreamAggrConfigs()
case <-streamAggrConfigReloaderCh:
reloadStreamAggrConfigs()
case <-configReloaderStopCh: case <-configReloaderStopCh:
return return
} }
reloadRelabelConfigs()
reloadStreamAggrConfigs()
} }
}() }()
} }
@ -376,11 +384,9 @@ func Stop() {
close(configReloaderStopCh) close(configReloaderStopCh)
configReloaderWG.Wait() configReloaderWG.Wait()
sasGlobal.Load().MustStop() sasGlobal.Load().MustStop(nil)
if deduplicatorGlobal != nil { deduplicatorGlobal.MustStop()
deduplicatorGlobal.MustStop() deduplicatorGlobal = nil
deduplicatorGlobal = nil
}
for _, rwctx := range rwctxs { for _, rwctx := range rwctxs {
rwctx.MustStop() rwctx.MustStop()
@ -850,7 +856,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
// sas and deduplicator must be stopped before rwctx is closed // sas and deduplicator must be stopped before rwctx is closed
// because sas can write pending series to rwctx.pss if there are any // because sas can write pending series to rwctx.pss if there are any
sas := rwctx.sas.Swap(nil) sas := rwctx.sas.Swap(nil)
sas.MustStop() sas.MustStop(nil)
if rwctx.deduplicator != nil { if rwctx.deduplicator != nil {
rwctx.deduplicator.MustStop() rwctx.deduplicator.MustStop()

View file

@ -4,12 +4,10 @@ import (
"flag" "flag"
"fmt" "fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
) )
var ( var (
@ -17,6 +15,8 @@ var (
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
"and -remoteWrite.streamAggr.config")
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+ streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") "are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
@ -82,47 +82,31 @@ func HasAnyStreamAggrConfigured() bool {
} }
func reloadStreamAggrConfigs() { func reloadStreamAggrConfigs() {
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) reloadStreamAggrConfig(-1)
for idx, rwctx := range rwctxs { for idx := range rwctxs {
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped) reloadStreamAggrConfig(idx)
} }
} }
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { func reloadStreamAggrConfig(idx int) {
path, opts := getStreamAggrOpts(idx) path, _ := getStreamAggrOpts(idx)
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
return
}
var sas *streamaggr.Aggregators var sas *streamaggr.Aggregators
var f string
if idx < 0 { if idx < 0 {
f = "-streamAggr.config"
sas = sasGlobal.Load() sas = sasGlobal.Load()
} else { } else {
f = "-remoteWrite.streamAggr.config"
sas = rwctxs[idx].sas.Load() sas = rwctxs[idx].sas.Load()
} }
if sas == nil {
if !sasNew.Equal(sas) { return
var sasOld *streamaggr.Aggregators }
if idx < 0 { if err := sas.Reload(); err != nil {
sasOld = sasGlobal.Swap(sasNew) logger.Errorf("cannot reload %s=%q; continue using the previously loaded config; error: %s", f, path, err)
} else { return
sasOld = rwctxs[idx].sas.Swap(sasNew)
}
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
} else {
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
} }
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
} }
func getStreamAggrOpts(idx int) (string, streamaggr.Options) { func getStreamAggrOpts(idx int) (string, streamaggr.Options) {

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -15,13 +16,14 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
) )
var ( var (
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
"and -remoteWrite.streamAggr.config")
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+ streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") "See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
@ -42,11 +44,6 @@ var (
saCfgReloaderStopCh chan struct{} saCfgReloaderStopCh chan struct{}
saCfgReloaderWG sync.WaitGroup saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil)
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
sasGlobal atomic.Pointer[streamaggr.Aggregators] sasGlobal atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator deduplicator *streamaggr.Deduplicator
) )
@ -68,7 +65,7 @@ func CheckStreamAggrConfig() error {
if err != nil { if err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
} }
sas.MustStop() sas.MustStop(nil)
return nil return nil
} }
@ -106,16 +103,21 @@ func InitStreamAggr() {
} }
sasGlobal.Store(sas) sasGlobal.Store(sas)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader. // Start config reloader.
saCfgReloaderWG.Add(1) saCfgReloaderWG.Add(1)
go func() { go func() {
var streamAggrConfigReloaderCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
streamAggrConfigReloaderCh = ticker.C
defer ticker.Stop()
}
defer saCfgReloaderWG.Done() defer saCfgReloaderWG.Done()
for { for {
select { select {
case <-sighupCh: case <-sighupCh:
case <-streamAggrConfigReloaderCh:
case <-saCfgReloaderStopCh: case <-saCfgReloaderStopCh:
return return
} }
@ -126,33 +128,12 @@ func InitStreamAggr() {
func reloadStreamAggrConfig() { func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc() sas := sasGlobal.Load()
if err := sas.Reload(); err != nil {
opts := streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
Alias: "global",
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return return
} }
sas := sasGlobal.Load() logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
sasNew.MustStop()
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
} }
// MustStopStreamAggr stops stream aggregators. // MustStopStreamAggr stops stream aggregators.
@ -161,7 +142,7 @@ func MustStopStreamAggr() {
saCfgReloaderWG.Wait() saCfgReloaderWG.Wait()
sas := sasGlobal.Swap(nil) sas := sasGlobal.Swap(nil)
sas.MustStop() sas.MustStop(nil)
if deduplicator != nil { if deduplicator != nil {
deduplicator.MustStop() deduplicator.MustStop()

View file

@ -71,6 +71,9 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
// MustStop stops d. // MustStop stops d.
func (d *Deduplicator) MustStop() { func (d *Deduplicator) MustStop() {
if d == nil {
return
}
metrics.UnregisterSet(d.ms) metrics.UnregisterSet(d.ms)
d.ms = nil d.ms = nil

View file

@ -1,7 +1,6 @@
package streamaggr package streamaggr
import ( import (
"encoding/json"
"fmt" "fmt"
"math" "math"
"slices" "slices"
@ -16,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -69,21 +69,28 @@ var (
// //
// The returned Aggregators must be stopped with MustStop() when no longer needed. // The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) { func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) {
data, err := fscore.ReadFileOrHTTP(path) a := &Aggregators{
if err != nil { path: path,
return nil, fmt.Errorf("cannot load aggregators: %w", err) pushFunc: pushFunc,
opts: opts,
} }
data, err = envtemplate.ReplaceBytes(data) if err := a.load(); err != nil {
if err != nil { return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
} }
return a, nil
}
as, err := newAggregatorsFromData(data, pushFunc, opts) // Reload reads config file and updates aggregators if there're any changes
if err != nil { func (a *Aggregators) Reload() error {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc()
if err := a.load(); err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0)
return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err)
} }
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1)
return as, nil metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp())
return nil
} }
// Options contains optional settings for the Aggregators. // Options contains optional settings for the Aggregators.
@ -243,44 +250,86 @@ type Config struct {
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
type Aggregators struct { type Aggregators struct {
as []*aggregator as []*aggregator
ms *metrics.Set
// configData contains marshaled configs. path string
// It is used in Equal() for comparing Aggregators. pushFunc PushFunc
configData []byte opts Options
ms *metrics.Set
} }
func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { func (a *Aggregators) load() error {
data, err := fscore.ReadFileOrHTTP(a.path)
if err != nil {
return fmt.Errorf("cannot load aggregators: %w", err)
}
data, err = envtemplate.ReplaceBytes(data)
if err != nil {
return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err)
}
if err = a.loadAggregatorsFromData(data); err != nil {
return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
}
return nil
}
func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator {
if a == nil {
return nil
}
idx := slices.IndexFunc(a.as, func(ac *aggregator) bool {
return ac.configData == aggr.configData
})
if idx >= 0 {
return a.as[idx]
}
return nil
}
func (a *Aggregators) loadAggregatorsFromData(data []byte) error {
var cfgs []*Config var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) return fmt.Errorf("cannot parse stream aggregation config: %w", err)
} }
ms := metrics.NewSet() ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs)) var unchanged, ac []*aggregator
var ignoreAggrConfigs []string
for i, cfg := range cfgs { for i, cfg := range cfgs {
opts.aggrID = i + 1 a.opts.aggrID = i + 1
a, err := newAggregator(cfg, pushFunc, ms, opts) aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts)
if err != nil { if err != nil {
// Stop already initialized aggregators before returning the error. // Stop already initialized aggregators before returning the error.
for _, a := range as[:i] { for _, c := range ac[:i] {
a.MustStop() c.MustStop()
} }
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
} }
as[i] = a if oldAggr := a.getAggregator(aggr); oldAggr != nil {
} aggr.MustStop()
configData, err := json.Marshal(cfgs) if !slices.Contains(ignoreAggrConfigs, oldAggr.configData) {
if err != nil { unchanged = append(unchanged, oldAggr)
logger.Panicf("BUG: cannot marshal the provided configs: %s", err) ignoreAggrConfigs = append(ignoreAggrConfigs, oldAggr.configData)
}
continue
}
if slices.ContainsFunc(ac, func(x *aggregator) bool {
return x.configData == aggr.configData
}) {
aggr.MustStop()
continue
}
ac = append(ac, aggr)
} }
metricLabels := fmt.Sprintf("url=%q", opts.Alias) metricLabels := fmt.Sprintf("url=%q", a.opts.Alias)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrConfigs))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac))
a.MustStop(ignoreAggrConfigs)
a.as = slices.Concat(unchanged, ac)
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
n := uint64(0) n := uint64(0)
for _, aggr := range as { for _, aggr := range a.as {
if aggr.da != nil { if aggr.da != nil {
n += aggr.da.sizeBytes() n += aggr.da.sizeBytes()
} }
@ -289,7 +338,7 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggr
}) })
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
n := uint64(0) n := uint64(0)
for _, aggr := range as { for _, aggr := range a.as {
if aggr.da != nil { if aggr.da != nil {
n += aggr.da.itemsCount() n += aggr.da.itemsCount()
} }
@ -298,11 +347,8 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggr
}) })
metrics.RegisterSet(ms) metrics.RegisterSet(ms)
return &Aggregators{ a.ms = ms
as: as, return nil
configData: configData,
ms: ms,
}, nil
} }
// IsEnabled returns true if Aggregators has at least one configured aggregator // IsEnabled returns true if Aggregators has at least one configured aggregator
@ -317,7 +363,7 @@ func (a *Aggregators) IsEnabled() bool {
} }
// MustStop stops a. // MustStop stops a.
func (a *Aggregators) MustStop() { func (a *Aggregators) MustStop(ignoreAggrConfigs []string) {
if a == nil { if a == nil {
return return
} }
@ -326,7 +372,9 @@ func (a *Aggregators) MustStop() {
a.ms = nil a.ms = nil
for _, aggr := range a.as { for _, aggr := range a.as {
aggr.MustStop() if ignoreAggrConfigs == nil || !slices.Contains(ignoreAggrConfigs, aggr.configData) {
aggr.MustStop()
}
} }
a.as = nil a.as = nil
} }
@ -336,7 +384,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil { if a == nil || b == nil {
return a == nil && b == nil return a == nil && b == nil
} }
return string(a.configData) == string(b.configData) return slices.Compare(a.configData(), b.configData()) == 0
}
func (a *Aggregators) configData() []string {
result := make([]string, len(a.as))
for i := range result {
result[i] = a.as[i].configData
}
slices.Sort(result)
return result
} }
// Push pushes tss to a. // Push pushes tss to a.
@ -374,6 +431,7 @@ type aggregator struct {
keepMetricNames bool keepMetricNames bool
ignoreOldSamples bool ignoreOldSamples bool
configData string
by []string by []string
without []string without []string
@ -474,6 +532,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
dropInputLabels := opts.DropInputLabels dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil { if v := cfg.DropInputLabels; v != nil {
dropInputLabels = *v dropInputLabels = *v
} else {
cfg.DropInputLabels = &dropInputLabels
} }
// initialize input_relabel_configs and output_relabel_configs // initialize input_relabel_configs and output_relabel_configs
@ -501,6 +561,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
keepMetricNames := opts.KeepMetricNames keepMetricNames := opts.KeepMetricNames
if v := cfg.KeepMetricNames; v != nil { if v := cfg.KeepMetricNames; v != nil {
keepMetricNames = *v keepMetricNames = *v
} else {
cfg.KeepMetricNames = &keepMetricNames
} }
if keepMetricNames { if keepMetricNames {
if len(cfg.Outputs) != 1 { if len(cfg.Outputs) != 1 {
@ -515,12 +577,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
ignoreOldSamples := opts.IgnoreOldSamples ignoreOldSamples := opts.IgnoreOldSamples
if v := cfg.IgnoreOldSamples; v != nil { if v := cfg.IgnoreOldSamples; v != nil {
ignoreOldSamples = *v ignoreOldSamples = *v
} else {
cfg.IgnoreOldSamples = &ignoreOldSamples
} }
// check cfg.IgnoreFirstIntervals // check cfg.IgnoreFirstIntervals
ignoreFirstIntervals := opts.IgnoreFirstIntervals ignoreFirstIntervals := opts.IgnoreFirstIntervals
if v := cfg.IgnoreFirstIntervals; v != nil { if v := cfg.IgnoreFirstIntervals; v != nil {
ignoreFirstIntervals = *v ignoreFirstIntervals = *v
} else {
cfg.IgnoreFirstIntervals = &ignoreFirstIntervals
} }
// initialize outputs list // initialize outputs list
@ -528,6 +594,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
} }
slices.Sort(cfg.Outputs)
cfg.Outputs = slices.Compact(cfg.Outputs)
aggrStates := make([]aggrState, len(cfg.Outputs)) aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs { for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") { if strings.HasPrefix(output, "quantiles(") {
@ -664,19 +732,29 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
a.da = newDedupAggr() a.da = newDedupAggr()
} }
alignFlushToInterval := !opts.NoAlignFlushToInterval noAlignFlushToInterval := opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil { if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v noAlignFlushToInterval = *v
} else {
cfg.NoAlignFlushToInterval = &noAlignFlushToInterval
} }
skipIncompleteFlush := !opts.FlushOnShutdown flushOnShutdown := opts.FlushOnShutdown
if v := cfg.FlushOnShutdown; v != nil { if v := cfg.FlushOnShutdown; v != nil {
skipIncompleteFlush = !*v flushOnShutdown = *v
} else {
cfg.FlushOnShutdown = &flushOnShutdown
} }
configData, err := yaml.Marshal(&cfg)
if err != nil {
return nil, fmt.Errorf("Failed to marshal config: %w", err)
}
a.configData = string(configData)
a.wg.Add(1) a.wg.Add(1)
go func() { go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals) a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals)
a.wg.Done() a.wg.Done()
}() }()

View file

@ -20,12 +20,11 @@ func TestAggregatorsFailure(t *testing.T) {
pushFunc := func(_ []prompbmarshal.TimeSeries) { pushFunc := func(_ []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called")) panic(fmt.Errorf("pushFunc shouldn't be called"))
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) a := &Aggregators{
if err == nil { pushFunc: pushFunc,
t.Fatalf("expecting non-nil error")
} }
if a != nil { if err := a.loadAggregatorsFromData([]byte(config)); err == nil {
t.Fatalf("expecting nil a") t.Fatalf("expecting non-nil error")
} }
} }
@ -158,12 +157,17 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper() t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := newAggregatorsFromData([]byte(a), pushFunc, Options{}) aa := &Aggregators{
if err != nil { pushFunc: pushFunc,
}
if err := aa.loadAggregatorsFromData([]byte(a)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
ab, err := newAggregatorsFromData([]byte(b), pushFunc, Options{})
if err != nil { ab := &Aggregators{
pushFunc: pushFunc,
}
if err := ab.loadAggregatorsFromData([]byte(b)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
result := aa.Equal(ab) result := aa.Equal(ab)
@ -221,19 +225,21 @@ func TestAggregatorsSuccess(t *testing.T) {
tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
opts := Options{ a := &Aggregators{
FlushOnShutdown: true, opts: Options{
NoAlignFlushToInterval: true, FlushOnShutdown: true,
NoAlignFlushToInterval: true,
},
pushFunc: pushFunc,
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)
matchIdxs := a.Push(tssInput, nil) matchIdxs := a.Push(tssInput, nil)
a.MustStop() a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected // Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := "" matchIdxsStr := ""
@ -917,19 +923,21 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
} }
tssOutputLock.Unlock() tssOutputLock.Unlock()
} }
opts := Options{ a := &Aggregators{
DedupInterval: 30 * time.Second, opts: Options{
FlushOnShutdown: true, DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
},
pushFunc: pushFunc,
} }
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
if err != nil {
t.Fatalf("cannot initialize aggregators: %s", err) t.Fatalf("cannot initialize aggregators: %s", err)
} }
// Push the inputMetrics to Aggregators // Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics) tssInput := mustParsePromMetrics(inputMetrics)
matchIdxs := a.Push(tssInput, nil) matchIdxs := a.Push(tssInput, nil)
a.MustStop() a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected // Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := "" matchIdxsStr := ""

View file

@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
} }
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(outputs, pushFunc) a := newBenchAggregators(outputs, pushFunc)
defer a.MustStop() defer a.MustStop(nil)
_ = a.Push(benchSeries, nil) _ = a.Push(benchSeries, nil)
b.ResetTimer() b.ResetTimer()
@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
func benchmarkAggregatorsPush(b *testing.B, output string) { func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {} pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators([]string{output}, pushFunc) a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop() defer a.MustStop(nil)
const loops = 100 const loops = 100
@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputs: [%s] outputs: [%s]
`, strings.Join(outputsQuoted, ",")) `, strings.Join(outputsQuoted, ","))
a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) a := &Aggregators{
if err != nil { pushFunc: pushFunc,
}
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
} }
return a return a