diff --git a/app/vmagent/remotewrite/relabel.go b/app/vmagent/remotewrite/relabel.go index 547f0982a..619ff17fa 100644 --- a/app/vmagent/remotewrite/relabel.go +++ b/app/vmagent/remotewrite/relabel.go @@ -48,9 +48,9 @@ func loadRelabelConfigs() (*relabelConfigs, error) { } if len(*relabelConfigPaths) > len(*remoteWriteURLs) { return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", - len(*relabelConfigPaths), (len(*remoteWriteURLs))) + len(*relabelConfigPaths), len(*remoteWriteURLs)) } - rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs)) + rcs.perCtx = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs)) for i, path := range *relabelConfigPaths { if len(path) == 0 { // Skip empty relabel config. @@ -60,14 +60,14 @@ func loadRelabelConfigs() (*relabelConfigs, error) { if err != nil { return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w", path, err) } - rcs.perURL[i] = prc + rcs.perCtx[i] = prc } return &rcs, nil } type relabelConfigs struct { global *promrelabel.ParsedConfigs - perURL []*promrelabel.ParsedConfigs + perCtx []*promrelabel.ParsedConfigs } // initLabelsGlobal must be called after parsing command-line flags. diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 78dfad8fb..cb9856c3d 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -87,24 +87,6 @@ var ( maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+ "By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit") - streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ - "See https://docs.victoriametrics.com/stream-aggregation/ . "+ - "See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval") - streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+ - "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ - "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") - streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ - "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ - "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/") - streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ - "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") - streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+ - "for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") - streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+ - "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") - streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ - "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") - disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+ "when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+ "See also -remoteWrite.dropSamplesOnOverload") @@ -139,6 +121,9 @@ func MultitenancyEnabled() bool { // Contains the current relabelConfigs. var allRelabelConfigs atomic.Pointer[relabelConfigs] +// Contains the current streamAggrConfigs. +var allStreamAggrConfigs atomic.Pointer[streamAggrConfigs] + // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. var maxQueues = cgroup.AvailableCPUs() * 16 @@ -215,6 +200,12 @@ func Init() { relabelConfigSuccess.Set(1) relabelConfigTimestamp.Set(fasttime.UnixTimestamp()) + sac := &streamAggrConfigs{} + if err := sac.loadStreamAggrGlobal(pushToRemoteStoragesDropFailed); err != nil { + logger.Fatalf("cannot load stream aggregation configs: %s", err) + } + allStreamAggrConfigs.Store(sac) + if len(*remoteWriteURLs) > 0 { rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs) } @@ -240,7 +231,9 @@ func Init() { return } reloadRelabelConfigs() - reloadStreamAggrConfigs() + if err := allStreamAggrConfigs.Load().reloadStreamAggrConfigs(); err != nil { + logger.Fatalf("Failed to reload stream aggregation configs: %s", err) + } } }() } @@ -301,12 +294,6 @@ var ( relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`) ) -func reloadStreamAggrConfigs() { - for _, rwctx := range rwctxs { - rwctx.reinitStreamAggr() - } -} - func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx { if len(urls) == 0 { logger.Panicf("BUG: urls must be non-empty") @@ -453,6 +440,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF defer putRelabelCtx(rctx) } globalRowsPushedBeforeRelabel.Add(rowsCount) + maxSamplesPerBlock := *maxRowsPerBlock // Allow up to 10x of labels per each block on average. maxLabelsPerBlock := 10 * maxSamplesPerBlock @@ -491,6 +479,16 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF } sortLabelsIfNeeded(tssBlock) tssBlock = limitSeriesCardinality(tssBlock) + sac := allStreamAggrConfigs.Load() + if sac.global != nil { + matchIdxs := matchIdxsPool.Get() + matchIdxs.B = sac.global.Push(tssBlock, matchIdxs.B) + if !*streamAggrGlobalKeepInput { + tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput) + } + matchIdxsPool.Put(matchIdxs) + } + if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) { return false } @@ -498,6 +496,12 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF return true } +func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) { + if tryPushBlockToRemoteStorages(tss, true) { + return + } +} + func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { if len(tssBlock) == 0 { // Nothing to push @@ -722,7 +726,6 @@ type remoteWriteCtx struct { fq *persistentqueue.FastQueue c *client - sas atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator streamAggrKeepInput bool @@ -807,27 +810,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)), } - // Initialize sas - sasFile := streamAggrConfig.GetOptionalArg(argIdx) - dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) - ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx) - if sasFile != "" { - opts := &streamaggr.Options{ - DedupInterval: dedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: ignoreOldSamples, - IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - } - sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) - if err != nil { - logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err) - } - rwctx.sas.Store(sas) + sac := allStreamAggrConfigs.Load() + if err := sac.loadStreamAggrPerCtx(argIdx, rwctx.pushInternalTrackDropped); err != nil { + logger.Fatalf("cannot load stream aggregation config: %s", err) + } + if sac.perCtx[argIdx] != nil { rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx) rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) - } else if dedupInterval > 0 { + } + + dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx) + if dedupInterval > 0 { rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels) } @@ -837,8 +830,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in func (rwctx *remoteWriteCtx) MustStop() { // sas and deduplicator must be stopped before rwctx is closed // because sas can write pending series to rwctx.pss if there are any - sas := rwctx.sas.Swap(nil) - sas.MustStop() + sas := allStreamAggrConfigs.Load().perCtx[rwctx.idx] + if sas != nil { + sas.MustStop(nil) + } if rwctx.deduplicator != nil { rwctx.deduplicator.MustStop() @@ -870,7 +865,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries rcs := allRelabelConfigs.Load() - pcs := rcs.perURL[rwctx.idx] + pcs := rcs.perCtx[rwctx.idx] if pcs.Len() > 0 { rctx = getRelabelCtx() // Make a copy of tss before applying relabeling in order to prevent @@ -888,7 +883,8 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa rwctx.rowsPushedAfterRelabel.Add(rowsCount) // Apply stream aggregation or deduplication if they are configured - sas := rwctx.sas.Load() + sac := allStreamAggrConfigs.Load() + sas := sac.perCtx[rwctx.idx] if sas != nil { matchIdxs := matchIdxsPool.Get() matchIdxs.B = sas.Push(tss, matchIdxs.B) @@ -985,40 +981,6 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo return ok } -func (rwctx *remoteWriteCtx) reinitStreamAggr() { - sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx) - if sasFile == "" { - // There is no stream aggregation for rwctx - return - } - - logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc() - opts := &streamaggr.Options{ - DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx), - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx), - } - sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts) - if err != nil { - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc() - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0) - logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err) - return - } - sas := rwctx.sas.Load() - if !sasNew.Equal(sas) { - sasOld := rwctx.sas.Swap(sasNew) - sasOld.MustStop() - logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile) - } else { - sasNew.MustStop() - logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile) - } - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1) - metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp()) -} - var tssPool = &sync.Pool{ New: func() interface{} { a := []prompbmarshal.TimeSeries{} @@ -1034,27 +996,6 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int { return rowsCount } -// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config -func CheckStreamAggrConfigs() error { - pushNoop := func(_ []prompbmarshal.TimeSeries) {} - for idx, sasFile := range *streamAggrConfig { - if sasFile == "" { - continue - } - opts := &streamaggr.Options{ - DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), - } - sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts) - if err != nil { - return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err) - } - sas.MustStop() - } - return nil -} - func newMapFromStrings(a []string) map[string]struct{} { m := make(map[string]struct{}, len(a)) for _, s := range a { diff --git a/app/vmagent/remotewrite/remotewrite_test.go b/app/vmagent/remotewrite/remotewrite_test.go index 6c384ba2e..6bc9bea72 100644 --- a/app/vmagent/remotewrite/remotewrite_test.go +++ b/app/vmagent/remotewrite/remotewrite_test.go @@ -57,13 +57,13 @@ func TestGetLabelsHash_Distribution(t *testing.T) { func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) { t.Helper() - perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig)) + perCtxRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig)) if err != nil { t.Fatalf("cannot load relabel configs: %s", err) } rcs := &relabelConfigs{ - perURL: []*promrelabel.ParsedConfigs{ - perURLRelabel, + perCtx: []*promrelabel.ParsedConfigs{ + perCtxRelabel, }, } allRelabelConfigs.Store(rcs) @@ -84,11 +84,16 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) { if len(streamAggrConfig) > 0 { f := createFile(t, []byte(streamAggrConfig)) - sas, err := streamaggr.LoadFromFile(f.Name(), nil, nil) - if err != nil { + sas := streamaggr.NewAggregators(f.Name(), nil, nil) + if err := sas.Load(); err != nil { t.Fatalf("cannot load streamaggr configs: %s", err) } - rwctx.sas.Store(sas) + sac := &streamAggrConfigs{ + perCtx: []*streamaggr.Aggregators{ + sas, + }, + } + allStreamAggrConfigs.Store(sac) } inputTss := mustParsePromMetrics(input) diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go new file mode 100644 index 000000000..98b5740f0 --- /dev/null +++ b/app/vmagent/remotewrite/streamaggr.go @@ -0,0 +1,127 @@ +package remotewrite + +import ( + "flag" + "fmt" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" +) + +var ( + // Global config + streamAggrGlobalConfig = flag.String("remoteWrite.streamAggr.global.config", "", "Optional path to file with stream aggregation global config. "+ + "See https://docs.victoriametrics.com/stream-aggregation/ . "+ + "See also -remoteWrite.streamAggr.global.keepInput, -remoteWrite.streamAggr.global.dropInput and -remoteWrite.streamAggr.global.dedupInterval") + streamAggrGlobalKeepInput = flag.Bool("remoteWrite.streamAggr.global.keepInput", false, "Whether to keep all the input samples after the global aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") + streamAggrGlobalDropInput = flag.Bool("remoteWrite.streamAggr.global.dropInput", false, "Whether to drop all the input samples after the global aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.global.keepInput and https://docs.victoriametrics.com/stream-aggregation/") + streamAggrGlobalDedupInterval = flagutil.NewDuration("remoteWrite.streamAggr.global.dedupInterval", "0s", "Input samples are de-duplicated with this interval on global "+ + "aggregator before optional aggregation with -remoteWrite.streamAggr.config . "+ + "See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") + streamAggrGlobalIgnoreOldSamples = flag.Bool("remoteWrite.streamAggr.global.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+ + "current aggregation interval for global aggregator for the corresponding -remoteWrite.streamAggr.config . "+ + "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") + streamAggrGlobalIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.global.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for global "+ + "aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+ + "clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") + streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.global.dropInputLabels", "An optional list of labels to drop from samples for global aggregator "+ + "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") + + // Per URL config + streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+ + "See https://docs.victoriametrics.com/stream-aggregation/ . "+ + "See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval") + streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/") + streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+ + "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ + "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") + streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ + "with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") + streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+ + "aggregation interval for the corresponding -remoteWrite.streamAggr.config . "+ + "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") + streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if "+ + "you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+ + "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") + streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ + "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") +) + +// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.globalConfig. +func CheckStreamAggrConfigs() error { + pushNoop := func(_ []prompbmarshal.TimeSeries) {} + sac := &streamAggrConfigs{} + if err := sac.loadStreamAggrGlobal(pushNoop); err != nil { + return fmt.Errorf("could not load global stream aggregation config: %w", err) + } + if len(*streamAggrConfig) > len(*remoteWriteURLs) { + return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", + len(*streamAggrConfig), len(*remoteWriteURLs)) + } + for i := range *streamAggrConfig { + if err := sac.loadStreamAggrPerCtx(i, pushNoop); err != nil { + return err + } + } + return nil +} + +func (sac *streamAggrConfigs) reloadStreamAggrConfigs() error { + if err := sac.global.Reload(); err != nil { + return fmt.Errorf("failed to reload global config: %w", err) + } + for _, perCtx := range sac.perCtx { + if err := perCtx.Reload(); err != nil { + return fmt.Errorf("failed to reload config at location %q: %w", perCtx.ConfigPath(), err) + } + } + return nil +} + +func (sac *streamAggrConfigs) loadStreamAggrGlobal(pushFunc streamaggr.PushFunc) error { + sac.perCtx = make([]*streamaggr.Aggregators, len(*remoteWriteURLs)) + if *streamAggrGlobalConfig != "" { + path := *streamAggrGlobalConfig + opts := &streamaggr.Options{ + DedupInterval: streamAggrGlobalDedupInterval.Duration(), + DropInputLabels: *streamAggrGlobalDropInputLabels, + IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, + IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, + } + sac.global = streamaggr.NewAggregators(path, pushFunc, opts) + return sac.global.Load() + } + return nil +} + +func (sac *streamAggrConfigs) loadStreamAggrPerCtx(idx int, pushFunc streamaggr.PushFunc) error { + if len(*streamAggrConfig) == 0 { + return nil + } + paths := *streamAggrConfig + path := paths[idx] + if len(path) == 0 { + // Skip empty stream aggregation config. + return nil + } + opts := &streamaggr.Options{ + DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), + DropInputLabels: *streamAggrDropInputLabels, + IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), + IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, + } + sac.perCtx[idx] = streamaggr.NewAggregators(path, pushFunc, opts) + return sac.perCtx[idx].Load() +} + +type streamAggrConfigs struct { + global *streamaggr.Aggregators + perCtx []*streamaggr.Aggregators +} diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 7a8ae0e11..c6bf8b295 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -15,7 +15,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" - "github.com/VictoriaMetrics/metrics" ) var ( @@ -42,11 +41,6 @@ var ( saCfgReloaderStopCh chan struct{} saCfgReloaderWG sync.WaitGroup - saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`) - saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`) - saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil) - saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`) - sasGlobal atomic.Pointer[streamaggr.Aggregators] deduplicator *streamaggr.Deduplicator ) @@ -63,11 +57,11 @@ func CheckStreamAggrConfig() error { IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts) - if err != nil { + sas := streamaggr.NewAggregators(*streamAggrConfig, pushNoop, opts) + if err := sas.Load(); err != nil { return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err) } - sas.MustStop() + sas.MustStop(nil) return nil } @@ -92,14 +86,12 @@ func InitStreamAggr() { IgnoreOldSamples: *streamAggrIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } - sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) - if err != nil { + sas := streamaggr.NewAggregators(*streamAggrConfig, pushAggregateSeries, opts) + if err := sas.Load(); err != nil { logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err) } sasGlobal.Store(sas) - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) // Start config reloader. saCfgReloaderWG.Add(1) @@ -118,32 +110,12 @@ func InitStreamAggr() { func reloadStreamAggrConfig() { logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig) - saCfgReloads.Inc() - - opts := &streamaggr.Options{ - DedupInterval: *streamAggrDedupInterval, - DropInputLabels: *streamAggrDropInputLabels, - IgnoreOldSamples: *streamAggrIgnoreOldSamples, - IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, - } - sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts) - if err != nil { - saCfgSuccess.Set(0) - saCfgReloadErr.Inc() - logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) - return - } sas := sasGlobal.Load() - if !sasNew.Equal(sas) { - sasOld := sasGlobal.Swap(sasNew) - sasOld.MustStop() - logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) + if err := sas.Reload(); err != nil { + logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err) } else { - logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig) - sasNew.MustStop() + logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig) } - saCfgSuccess.Set(1) - saCfgTimestamp.Set(fasttime.UnixTimestamp()) } // MustStopStreamAggr stops stream aggregators. @@ -152,7 +124,7 @@ func MustStopStreamAggr() { saCfgReloaderWG.Wait() sas := sasGlobal.Swap(nil) - sas.MustStop() + sas.MustStop(nil) if deduplicator != nil { deduplicator.MustStop() diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f7110290e..2db5394b5 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -16,6 +16,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -23,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" + "github.com/cespare/xxhash/v2" "gopkg.in/yaml.v2" ) @@ -58,27 +60,54 @@ var ( }) ) -// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data. +// NewAggregators initializes Aggregators struct with shared data for all aggregators +func NewAggregators(path string, pushFunc PushFunc, opts *Options) *Aggregators { + return &Aggregators{ + path: path, + pushFunc: pushFunc, + opts: opts, + } +} + +// Reload reads config file and updates aggregators if there're any changes +func (a *Aggregators) Reload() error { + if a == nil { + return nil + } + logger.Infof("reloading stream aggregation config at %q", a.path) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc() + if err := a.Load(); err != nil { + logger.Errorf("cannot reload stream aggregation config from %q; continue using the previously loaded config; error: %s", a.path, err) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc() + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0) + return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err) + } + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp()) + logger.Infof("reloaded stream aggregation config at %q", a.path) + return nil +} + +// Load loads Aggregators from predefined path and uses pushFunc for pushing the aggregated data. // // opts can contain additional options. If opts is nil, then default options are used. -// -// The returned Aggregators must be stopped with MustStop() when no longer needed. -func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) { - data, err := fscore.ReadFileOrHTTP(path) +func (a *Aggregators) Load() error { + data, err := fscore.ReadFileOrHTTP(a.path) if err != nil { - return nil, fmt.Errorf("cannot load aggregators: %w", err) + return fmt.Errorf("cannot load aggregators: %w", err) } + data, err = envtemplate.ReplaceBytes(data) if err != nil { - return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err) + return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err) } - as, err := newAggregatorsFromData(data, pushFunc, opts) + err = a.loadAggregatorsFromData(data) if err != nil { - return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err) + return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err) } - return as, nil + return nil } // Options contains optional settings for the Aggregators. @@ -232,42 +261,73 @@ type Config struct { // Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data. type Aggregators struct { - as []*aggregator - - // configData contains marshaled configs. - // It is used in Equal() for comparing Aggregators. - configData []byte - - ms *metrics.Set + as []*aggregator + ms *metrics.Set + path string + pushFunc PushFunc + opts *Options } -func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) { +func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator { + if a == nil { + return nil + } + if idx := slices.IndexFunc(a.as, func(ac *aggregator) bool { + return ac.configHash == aggr.configHash + }); idx >= 0 { + return a.as[idx] + } + return nil +} + +// ConfigPath returns path of aggregators config file +func (a *Aggregators) ConfigPath() string { + return a.path +} + +func (a *Aggregators) loadAggregatorsFromData(data []byte) error { var cfgs []*Config if err := yaml.UnmarshalStrict(data, &cfgs); err != nil { - return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err) + return fmt.Errorf("cannot parse stream aggregation config: %w", err) } ms := metrics.NewSet() - as := make([]*aggregator, len(cfgs)) + unchanged := make([]*aggregator, len(cfgs)) + unchanged = unchanged[:0] + ac := make([]*aggregator, len(cfgs)) + ac = ac[:0] + ignoreAggrHashes := make([]uint64, len(a.as)) + ignoreAggrHashes = ignoreAggrHashes[:0] for i, cfg := range cfgs { - a, err := newAggregator(cfg, pushFunc, ms, opts) + aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts) if err != nil { // Stop already initialized aggregators before returning the error. - for _, a := range as[:i] { - a.MustStop() + for _, s := range ac[:i] { + s.MustStop() } - return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) + return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err) } - as[i] = a + if oldAgg := a.getAggregator(aggr); oldAgg != nil { + aggr.MustStop() + unchanged = append(unchanged, oldAgg) + ignoreAggrHashes = append(ignoreAggrHashes, oldAgg.configHash) + continue + } + if slices.ContainsFunc(ac, func(x *aggregator) bool { + return x.configHash == aggr.configHash + }) { + aggr.MustStop() + continue + } + ac = append(ac, aggr) } - configData, err := json.Marshal(cfgs) - if err != nil { - logger.Panicf("BUG: cannot marshal the provided configs: %s", err) - } - + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrHashes)) + metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac)) + a.MustStop(ignoreAggrHashes) + a.as = slices.Concat(unchanged, ac) _ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 { n := uint64(0) - for _, aggr := range as { + for _, aggr := range a.as { if aggr.da != nil { n += aggr.da.sizeBytes() } @@ -276,24 +336,20 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg }) _ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 { n := uint64(0) - for _, aggr := range as { + for _, aggr := range a.as { if aggr.da != nil { n += aggr.da.itemsCount() } } return float64(n) }) - metrics.RegisterSet(ms) - return &Aggregators{ - as: as, - configData: configData, - ms: ms, - }, nil + a.ms = ms + return nil } // MustStop stops a. -func (a *Aggregators) MustStop() { +func (a *Aggregators) MustStop(ignoreAggrHashes []uint64) { if a == nil { return } @@ -302,7 +358,9 @@ func (a *Aggregators) MustStop() { a.ms = nil for _, aggr := range a.as { - aggr.MustStop() + if ignoreAggrHashes == nil || !slices.Contains(ignoreAggrHashes, aggr.configHash) { + aggr.MustStop() + } } a.as = nil } @@ -312,7 +370,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool { if a == nil || b == nil { return a == nil && b == nil } - return string(a.configData) == string(b.configData) + return slices.Compare(a.getConfigHashes(), b.getConfigHashes()) == 0 +} + +func (a *Aggregators) getConfigHashes() []uint64 { + result := make([]uint64, len(a.as)) + for i := range result { + result[i] = a.as[i].configHash + } + slices.Sort(result) + return result } // Push pushes tss to a. @@ -350,6 +417,7 @@ type aggregator struct { keepMetricNames bool ignoreOldSamples bool + configHash uint64 by []string without []string @@ -443,6 +511,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option dropInputLabels := opts.DropInputLabels if v := cfg.DropInputLabels; v != nil { dropInputLabels = *v + } else { + cfg.DropInputLabels = &dropInputLabels } // initialize input_relabel_configs and output_relabel_configs @@ -470,6 +540,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option keepMetricNames := opts.KeepMetricNames if v := cfg.KeepMetricNames; v != nil { keepMetricNames = *v + } else { + cfg.KeepMetricNames = &keepMetricNames } if keepMetricNames { if len(cfg.Outputs) != 1 { @@ -484,12 +556,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option ignoreOldSamples := opts.IgnoreOldSamples if v := cfg.IgnoreOldSamples; v != nil { ignoreOldSamples = *v + } else { + cfg.IgnoreOldSamples = &ignoreOldSamples } // check cfg.IgnoreFirstIntervals ignoreFirstIntervals := opts.IgnoreFirstIntervals if v := cfg.IgnoreFirstIntervals; v != nil { ignoreFirstIntervals = *v + } else { + cfg.IgnoreFirstIntervals = &ignoreFirstIntervals } // initialize outputs list @@ -497,6 +573,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+ "see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs) } + slices.Sort(cfg.Outputs) + cfg.Outputs = slices.Compact(cfg.Outputs) aggrStates := make([]aggrState, len(cfg.Outputs)) for i, output := range cfg.Outputs { if strings.HasPrefix(output, "quantiles(") { @@ -601,23 +679,34 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`), dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`), } + if dedupInterval > 0 { a.da = newDedupAggr() } - alignFlushToInterval := !opts.NoAlignFlushToInterval + noAlignFlushToInterval := opts.NoAlignFlushToInterval if v := cfg.NoAlignFlushToInterval; v != nil { - alignFlushToInterval = !*v + noAlignFlushToInterval = *v + } else { + cfg.NoAlignFlushToInterval = &noAlignFlushToInterval } - skipIncompleteFlush := !opts.FlushOnShutdown + flushOnShutdown := opts.FlushOnShutdown if v := cfg.FlushOnShutdown; v != nil { - skipIncompleteFlush = !*v + flushOnShutdown = !*v + } else { + cfg.FlushOnShutdown = &flushOnShutdown } + data, err := json.Marshal(&cfg) + if err != nil { + return nil, fmt.Errorf("Failed to marshal config: %w", err) + } + a.configHash = xxhash.Sum64(data) + a.wg.Add(1) go func() { - a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals) + a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals) a.wg.Done() }() diff --git a/lib/streamaggr/streamaggr_test.go b/lib/streamaggr/streamaggr_test.go index f3e73ba51..a70c3f1fa 100644 --- a/lib/streamaggr/streamaggr_test.go +++ b/lib/streamaggr/streamaggr_test.go @@ -17,16 +17,11 @@ import ( func TestAggregatorsFailure(t *testing.T) { f := func(config string) { t.Helper() - pushFunc := func(_ []prompbmarshal.TimeSeries) { - panic(fmt.Errorf("pushFunc shouldn't be called")) - } - a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) + a := &Aggregators{} + err := a.loadAggregatorsFromData([]byte(config)) if err == nil { t.Fatalf("expecting non-nil error") } - if a != nil { - t.Fatalf("expecting nil a") - } } // Invalid config @@ -158,12 +153,16 @@ func TestAggregatorsEqual(t *testing.T) { t.Helper() pushFunc := func(_ []prompbmarshal.TimeSeries) {} - aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil) - if err != nil { + aa := &Aggregators{ + pushFunc: pushFunc, + } + if err := aa.loadAggregatorsFromData([]byte(a)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } - ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil) - if err != nil { + ab := &Aggregators{ + pushFunc: pushFunc, + } + if err := ab.loadAggregatorsFromData([]byte(b)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } result := aa.Equal(ab) @@ -221,19 +220,21 @@ func TestAggregatorsSuccess(t *testing.T) { tssOutput = appendClonedTimeseries(tssOutput, tss) tssOutputLock.Unlock() } - opts := &Options{ - FlushOnShutdown: true, - NoAlignFlushToInterval: true, + a := &Aggregators{ + opts: &Options{ + FlushOnShutdown: true, + NoAlignFlushToInterval: true, + }, + pushFunc: pushFunc, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) - if err != nil { + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) matchIdxs := a.Push(tssInput, nil) - a.MustStop() + a.MustStop(nil) // Verify matchIdxs equals to matchIdxsExpected matchIdxsStr := "" @@ -905,19 +906,21 @@ func TestAggregatorsWithDedupInterval(t *testing.T) { } tssOutputLock.Unlock() } - opts := &Options{ - DedupInterval: 30 * time.Second, - FlushOnShutdown: true, + a := &Aggregators{ + pushFunc: pushFunc, + opts: &Options{ + DedupInterval: 30 * time.Second, + FlushOnShutdown: true, + }, } - a, err := newAggregatorsFromData([]byte(config), pushFunc, opts) - if err != nil { + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { t.Fatalf("cannot initialize aggregators: %s", err) } // Push the inputMetrics to Aggregators tssInput := mustParsePromMetrics(inputMetrics) matchIdxs := a.Push(tssInput, nil) - a.MustStop() + a.MustStop(nil) // Verify matchIdxs equals to matchIdxsExpected matchIdxsStr := "" diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index f06da26cc..470de0206 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { } pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators(outputs, pushFunc) - defer a.MustStop() + defer a.MustStop(nil) _ = a.Push(benchSeries, nil) b.ResetTimer() @@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) { func benchmarkAggregatorsPush(b *testing.B, output string) { pushFunc := func(_ []prompbmarshal.TimeSeries) {} a := newBenchAggregators([]string{output}, pushFunc) - defer a.MustStop() + defer a.MustStop(nil) const loops = 100 @@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := newAggregatorsFromData([]byte(config), pushFunc, nil) - if err != nil { + a := &Aggregators{ + pushFunc: pushFunc, + } + if err := a.loadAggregatorsFromData([]byte(config)); err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) } return a