From 1a3ad91f0951ff580b37164c82f263d3138a00e1 Mon Sep 17 00:00:00 2001 From: AndrewChubatiuk Date: Fri, 14 Jun 2024 21:15:17 +0300 Subject: [PATCH] lib/streamaggr: added hot-reload --- app/vmagent/remotewrite/remotewrite.go | 22 +-- app/vmagent/remotewrite/streamaggr.go | 50 +++---- app/vminsert/common/streamaggr.go | 49 ++----- lib/streamaggr/deduplicator.go | 3 + lib/streamaggr/streamaggr.go | 174 ++++++++++++++++------- lib/streamaggr/streamaggr_test.go | 50 ++++--- lib/streamaggr/streamaggr_timing_test.go | 10 +- 7 files changed, 210 insertions(+), 148 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index f79d4b42f..751fdc705 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -231,15 +231,23 @@ func Init() { // Start config reloader. configReloaderWG.Add(1) go func() { + var streamAggrConfigReloaderCh <-chan time.Time + if *streamAggrConfigCheckInterval > 0 { + ticker := time.NewTicker(*streamAggrConfigCheckInterval) + streamAggrConfigReloaderCh = ticker.C + defer ticker.Stop() + } defer configReloaderWG.Done() for { select { case <-sighupCh: + reloadRelabelConfigs() + reloadStreamAggrConfigs() + case <-streamAggrConfigReloaderCh: + reloadStreamAggrConfigs() case <-configReloaderStopCh: return } - reloadRelabelConfigs() - reloadStreamAggrConfigs() } }() } @@ -376,11 +384,9 @@ func Stop() { close(configReloaderStopCh) configReloaderWG.Wait() - sasGlobal.Load().MustStop() - if deduplicatorGlobal != nil { - deduplicatorGlobal.MustStop() - deduplicatorGlobal = nil - } + sasGlobal.Load().MustStop(nil) + deduplicatorGlobal.MustStop() + deduplicatorGlobal = nil for _, rwctx := range rwctxs { rwctx.MustStop() @@ -850,7 +856,7 @@ func (rwctx *remoteWriteCtx) MustStop() { // sas and deduplicator must be stopped before rwctx is closed // because sas can write pending series to rwctx.pss if there are any sas := rwctx.sas.Swap(nil) - sas.MustStop() + sas.MustStop(nil) if rwctx.deduplicator != nil { rwctx.deduplicator.MustStop() diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index b8ea11a16..09131bce5 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -4,12 +4,10 @@ import ( "flag" "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" - "github.com/VictoriaMetrics/metrics" ) var ( @@ -17,6 +15,8 @@ var ( streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") + streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+ + "and -remoteWrite.streamAggr.config") streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+ "with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") @@ -82,47 +82,31 @@ func HasAnyStreamAggrConfigured() bool { } func reloadStreamAggrConfigs() { - reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) - for idx, rwctx := range rwctxs { - reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped) + reloadStreamAggrConfig(-1) + for idx := range rwctxs { + reloadStreamAggrConfig(idx) } } -func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { - path, opts := getStreamAggrOpts(idx) - logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() - - sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) - logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err) - return - } - +func reloadStreamAggrConfig(idx int) { + path, _ := getStreamAggrOpts(idx) var sas *streamaggr.Aggregators + var f string + if idx < 0 { + f = "-streamAggr.config" sas = sasGlobal.Load() } else { + f = "-remoteWrite.streamAggr.config" sas = rwctxs[idx].sas.Load() } - - if !sasNew.Equal(sas) { - var sasOld *streamaggr.Aggregators - if idx < 0 { - sasOld = sasGlobal.Swap(sasNew) - } else { - sasOld = rwctxs[idx].sas.Swap(sasNew) - } - sasOld.MustStop() - logger.Infof("successfully reloaded stream aggregation configs at %q", path) - } else { - sasNew.MustStop() - logger.Infof("successfully reloaded stream aggregation configs at %q", path) + if sas == nil { + return + } + if err := sas.Reload(); err != nil { + logger.Errorf("cannot reload %s=%q; continue using the previously loaded config; error: %s", f, path, err) + return } - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) } func getStreamAggrOpts(idx int) (string, streamaggr.Options) { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index c5ec26ce9..7914806f2 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" @@ -15,13 +16,14 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" - "github.com/VictoriaMetrics/metrics" ) var ( streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") + streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+ + "and -remoteWrite.streamAggr.config") streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+ "By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+ "See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") @@ -42,11 +44,6 @@ var ( saCfgReloaderStopCh chan struct{} saCfgReloaderWG sync.WaitGroup - saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`) - saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`) - saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil) - saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) - sasGlobal atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator ) @@ -68,7 +65,7 @@ func CheckStreamAggrConfig() error { if err != nil { return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) } - sas.MustStop() + sas.MustStop(nil) return nil } @@ -106,16 +103,21 @@ func InitStreamAggr() { } sasGlobal.Store(sas) - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) // Start config reloader. saCfgReloaderWG.Add(1) go func() { + var streamAggrConfigReloaderCh <-chan time.Time + if *streamAggrConfigCheckInterval > 0 { + ticker := time.NewTicker(*streamAggrConfigCheckInterval) + streamAggrConfigReloaderCh = ticker.C + defer ticker.Stop() + } defer saCfgReloaderWG.Done() for { select { case <-sighupCh: + case <-streamAggrConfigReloaderCh: case <-saCfgReloaderStopCh: return } @@ -126,33 +128,12 @@ func InitStreamAggr() { func reloadStreamAggrConfig() { logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) - saCfgReloads.Inc() - - opts := streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: *streamAggrIgnoreOldSamples, - IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - Alias: "global", - } - sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() + sas := sasGlobal.Load() + if err := sas.Reload(); err != nil { logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) return } - sas := sasGlobal.Load() - if !sasNew.Equal(sas) { - sasOld := sasGlobal.Swap(sasNew) - sasOld.MustStop() - logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) - } else { - logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig) - sasNew.MustStop() - } - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) + logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) } // MustStopStreamAggr stops stream aggregators. @@ -161,7 +142,7 @@ func MustStopStreamAggr() { saCfgReloaderWG.Wait() sas := sasGlobal.Swap(nil) - sas.MustStop() + sas.MustStop(nil) if deduplicator != nil { deduplicator.MustStop() diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index c49580dcc..0386ad0ce 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -71,6 +71,9 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels // MustStop stops d. func (d *Deduplicator) MustStop() { + if d == nil { + return + } metrics.UnregisterSet(d.ms) d.ms = nil diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 831cd500f..dd7f0de83 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -1,7 +1,6 @@ package streamaggr import ( - "encoding/json" "fmt" "math" "slices" @@ -16,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -69,21 +69,28 @@ var ( // // The returned Aggregators must be stopped with MustStop() when no longer needed. func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) { - data, err := fscore.ReadFileOrHTTP(path) - if err != nil { - return nil, fmt.Errorf("cannot load aggregators: %w", err) + a := &Aggregators{ + path: path, + pushFunc: pushFunc, + opts: opts, } - data, err = envtemplate.ReplaceBytes(data) - if err != nil { - return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) + if err := a.load(); err != nil { + return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err) } + return a, nil +} - as, err := newAggregatorsFromData(data, pushFunc, opts) - if err != nil { - return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) +// Reload reads config file and updates aggregators if there're any changes +func (a *Aggregators) Reload() error { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc() + if err := a.load(); err != nil { + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0) + return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err) } - - return as, nil + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp()) + return nil } // Options contains optional settings for the Aggregators. @@ -243,44 +250,86 @@ type Config struct { // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. type Aggregators struct { - as []*aggregator - - // configData contains marshaled configs. - // It is used in Equal() for comparing Aggregators. - configData []byte - - ms *metrics.Set + as []*aggregator + ms *metrics.Set + path string + pushFunc PushFunc + opts Options } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) { +func (a *Aggregators) load() error { + data, err := fscore.ReadFileOrHTTP(a.path) + if err != nil { + return fmt.Errorf("cannot load aggregators: %w", err) + } + data, err = envtemplate.ReplaceBytes(data) + if err != nil { + return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err) + } + if err = a.loadAggregatorsFromData(data); err != nil { + return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err) + } + return nil +} + +func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator { + if a == nil { + return nil + } + idx := slices.IndexFunc(a.as, func(ac *aggregator) bool { + return ac.configData == aggr.configData + }) + if idx >= 0 { + return a.as[idx] + } + return nil +} + +func (a *Aggregators) loadAggregatorsFromData(data []byte) error { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) + return fmt.Errorf("cannot parse stream aggregation config: %w", err) } ms := metrics.NewSet() - as := make([]*aggregator, len(cfgs)) + var unchanged, ac []*aggregator + var ignoreAggrConfigs []string for i, cfg := range cfgs { - opts.aggrID = i + 1 - a, err := newAggregator(cfg, pushFunc, ms, opts) + a.opts.aggrID = i + 1 + aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts) if err != nil { // Stop already initialized aggregators before returning the error. - for _, a := range as[:i] { - a.MustStop() + for _, c := range ac[:i] { + c.MustStop() } - return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) + return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } - as[i] = a - } - configData, err := json.Marshal(cfgs) - if err != nil { - logger.Panicf("BUG: cannot marshal the provided configs: %s", err) + if oldAggr := a.getAggregator(aggr); oldAggr != nil { + aggr.MustStop() + if !slices.Contains(ignoreAggrConfigs, oldAggr.configData) { + unchanged = append(unchanged, oldAggr) + ignoreAggrConfigs = append(ignoreAggrConfigs, oldAggr.configData) + } + continue + } + if slices.ContainsFunc(ac, func(x *aggregator) bool { + return x.configData == aggr.configData + }) { + aggr.MustStop() + continue + } + ac = append(ac, aggr) } - metricLabels := fmt.Sprintf("url=%q", opts.Alias) + metricLabels := fmt.Sprintf("url=%q", a.opts.Alias) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrConfigs)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac)) + a.MustStop(ignoreAggrConfigs) + a.as = slices.Concat(unchanged, ac) + _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 { n := uint64(0) - for _, aggr := range as { + for _, aggr := range a.as { if aggr.da != nil { n += aggr.da.sizeBytes() } @@ -289,7 +338,7 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggr }) _ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 { n := uint64(0) - for _, aggr := range as { + for _, aggr := range a.as { if aggr.da != nil { n += aggr.da.itemsCount() } @@ -298,11 +347,8 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggr }) metrics.RegisterSet(ms) - return &Aggregators{ - as: as, - configData: configData, - ms: ms, - }, nil + a.ms = ms + return nil } // IsEnabled returns true if Aggregators has at least one configured aggregator @@ -317,7 +363,7 @@ func (a *Aggregators) IsEnabled() bool { } // MustStop stops a. -func (a *Aggregators) MustStop() { +func (a *Aggregators) MustStop(ignoreAggrConfigs []string) { if a == nil { return } @@ -326,7 +372,9 @@ func (a *Aggregators) MustStop() { a.ms = nil for _, aggr := range a.as { - aggr.MustStop() + if ignoreAggrConfigs == nil || !slices.Contains(ignoreAggrConfigs, aggr.configData) { + aggr.MustStop() + } } a.as = nil } @@ -336,7 +384,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool { if a == nil || b == nil { return a == nil && b == nil } - return string(a.configData) == string(b.configData) + return slices.Compare(a.configData(), b.configData()) == 0 +} + +func (a *Aggregators) configData() []string { + result := make([]string, len(a.as)) + for i := range result { + result[i] = a.as[i].configData + } + slices.Sort(result) + return result } // Push pushes tss to a. @@ -374,6 +431,7 @@ type aggregator struct { keepMetricNames bool ignoreOldSamples bool + configData string by []string without []string @@ -474,6 +532,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options dropInputLabels := opts.DropInputLabels if v := cfg.DropInputLabels; v != nil { dropInputLabels = *v + } else { + cfg.DropInputLabels = &dropInputLabels } // initialize input_relabel_configs and output_relabel_configs @@ -501,6 +561,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options keepMetricNames := opts.KeepMetricNames if v := cfg.KeepMetricNames; v != nil { keepMetricNames = *v + } else { + cfg.KeepMetricNames = &keepMetricNames } if keepMetricNames { if len(cfg.Outputs) != 1 { @@ -515,12 +577,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options ignoreOldSamples := opts.IgnoreOldSamples if v := cfg.IgnoreOldSamples; v != nil { ignoreOldSamples = *v + } else { + cfg.IgnoreOldSamples = &ignoreOldSamples } // check cfg.IgnoreFirstIntervals ignoreFirstIntervals := opts.IgnoreFirstIntervals if v := cfg.IgnoreFirstIntervals; v != nil { ignoreFirstIntervals = *v + } else { + cfg.IgnoreFirstIntervals = &ignoreFirstIntervals } // initialize outputs list @@ -528,6 +594,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) } + slices.Sort(cfg.Outputs) + cfg.Outputs = slices.Compact(cfg.Outputs) aggrStates := make([]aggrState, len(cfg.Outputs)) for i, output := range cfg.Outputs { if strings.HasPrefix(output, "quantiles(") { @@ -664,19 +732,29 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options a.da = newDedupAggr() } - alignFlushToInterval := !opts.NoAlignFlushToInterval + noAlignFlushToInterval := opts.NoAlignFlushToInterval if v := cfg.NoAlignFlushToInterval; v != nil { - alignFlushToInterval = !*v + noAlignFlushToInterval = *v + } else { + cfg.NoAlignFlushToInterval = &noAlignFlushToInterval } - skipIncompleteFlush := !opts.FlushOnShutdown + flushOnShutdown := opts.FlushOnShutdown if v := cfg.FlushOnShutdown; v != nil { - skipIncompleteFlush = !*v + flushOnShutdown = *v + } else { + cfg.FlushOnShutdown = &flushOnShutdown } + configData, err := yaml.Marshal(&cfg) + if err != nil { + return nil, fmt.Errorf("Failed to marshal config: %w", err) + } + a.configData = string(configData) + a.wg.Add(1) go func() { - a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals) + a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals) a.wg.Done() }() diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index c6d17b832..046cf2a55 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -20,12 +20,11 @@ func TestAggregatorsFailure(t *testing.T) { pushFunc := func(_ []prompbmarshal.TimeSeries) { panic(fmt.Errorf("pushFunc shouldn't be called")) } - a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) - if err == nil { - t.Fatalf("expecting non-nil error") + a := &Aggregators{ + pushFunc: pushFunc, } - if a != nil { - t.Fatalf("expecting nil a") + if err := a.loadAggregatorsFromData([]byte(config)); err == nil { + t.Fatalf("expecting non-nil error") } } @@ -158,12 +157,17 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := newAggregatorsFromData([]byte(a), pushFunc, Options{}) - if err != nil { + aa := &Aggregators{ + pushFunc: pushFunc, + } + if err := aa.loadAggregatorsFromData([]byte(a)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := newAggregatorsFromData([]byte(b), pushFunc, Options{}) - if err != nil { + + ab := &Aggregators{ + pushFunc: pushFunc, + } + if err := ab.loadAggregatorsFromData([]byte(b)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } result := aa.Equal(ab) @@ -221,19 +225,21 @@ func TestAggregatorsSuccess(t *testing.T) { tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutputLock.Unlock() } - opts := Options{ - FlushOnShutdown: true, - NoAlignFlushToInterval: true, + a := &Aggregators{ + opts: Options{ + FlushOnShutdown: true, + NoAlignFlushToInterval: true, + }, + pushFunc: pushFunc, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) - if err != nil { + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) matchIdxs := a.Push(tssInput, nil) - a.MustStop() + a.MustStop(nil) // Verify matchIdxs equals to matchIdxsExpected matchIdxsStr := "" @@ -917,19 +923,21 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - opts := Options{ - DedupInterval: 30 * time.Second, - FlushOnShutdown: true, + a := &Aggregators{ + opts: Options{ + DedupInterval: 30 * time.Second, + FlushOnShutdown: true, + }, + pushFunc: pushFunc, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) - if err != nil { + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) matchIdxs := a.Push(tssInput, nil) - a.MustStop() + a.MustStop(nil) // Verify matchIdxs equals to matchIdxsExpected matchIdxsStr := "" diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index b328a61a0..470de0206 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { } pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators(outputs, pushFunc) - defer a.MustStop() + defer a.MustStop(nil) _ = a.Push(benchSeries, nil) b.ResetTimer() @@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators([]string{output}, pushFunc) - defer a.MustStop() + defer a.MustStop(nil) const loops = 100 @@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := newAggregatorsFromData([]byte(config), pushFunc, Options{}) - if err != nil { + a := &Aggregators{ + pushFunc: pushFunc, + } + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } return a