package remotewrite import ( "flag" "fmt" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/metrics" ) var ( // Global config streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval") streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+ "with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") streamAggrGlobalDropInput = flag.Bool("streamAggr.dropInput", false, "Whether to drop all the input samples after the aggregation "+ "with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to remote storages write. See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/") streamAggrGlobalDedupInterval = flagutil.NewDuration("streamAggr.dedupInterval", "0s", "Input samples are de-duplicated with this interval on "+ "aggregator before optional aggregation with -streamAggr.config . "+ "See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") streamAggrGlobalIgnoreOldSamples = flag.Bool("streamAggr.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+ "current aggregation interval for aggregator. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") streamAggrGlobalIgnoreFirstIntervals = flag.Int("streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for "+ "aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+ "clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples for aggregator "+ "before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") // Per URL config streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config for the corresponding -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation/ . "+ "See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval") streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+ "with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/") streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+ "with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. By default, only aggregates samples are dropped, while the remaining samples "+ "are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/") streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+ "with -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication") streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+ "aggregation interval for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples") streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start "+ "for the corresponding -remoteWrite.streamAggr.config at the corresponding -remoteWrite.url. Increase this value if "+ "you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving bufferred delayed data from clients pushing data into the vmagent. "+ "See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start") streamAggrDropInputLabels = flagutil.NewArrayString("remoteWrite.streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+ "before stream de-duplication and aggregation with -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.dedupInterval at the corresponding -remoteWrite.url. "+ "See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels") ) // CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config. func CheckStreamAggrConfigs() error { // Check global config sas, err := newStreamAggrConfigGlobal() if err != nil { return err } sas.MustStop() if len(*streamAggrConfig) > len(*remoteWriteURLs) { return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d", len(*streamAggrConfig), len(*remoteWriteURLs)) } pushNoop := func(_ []prompbmarshal.TimeSeries) {} for idx := range *streamAggrConfig { sas, err := newStreamAggrConfigPerURL(idx, pushNoop) if err != nil { return err } sas.MustStop() } return nil } func reloadStreamAggrConfigs() { reloadStreamAggrConfigGlobal() for _, rwctx := range rwctxsGlobal { rwctx.reloadStreamAggrConfig() } } func reloadStreamAggrConfigGlobal() { path := *streamAggrGlobalConfig if path == "" { return } logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() sasNew, err := newStreamAggrConfigGlobal() if err != nil { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) logger.Errorf("cannot reload -streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err) return } sas := sasGlobal.Load() if !sasNew.Equal(sas) { sasOld := sasGlobal.Swap(sasNew) sasOld.MustStop() logger.Infof("successfully reloaded -streamAggr.config=%q", path) } else { sasNew.MustStop() logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path) } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) } func initStreamAggrConfigGlobal() { sas, err := newStreamAggrConfigGlobal() if err != nil { logger.Fatalf("cannot initialize gloabl stream aggregators: %s", err) } if sas != nil { filePath := sas.FilePath() sasGlobal.Store(sas) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) } else { dedupInterval := streamAggrGlobalDedupInterval.Duration() if dedupInterval > 0 { deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesTrackDropped, dedupInterval, *streamAggrGlobalDropInputLabels, "dedup-global") } } } func (rwctx *remoteWriteCtx) initStreamAggrConfig() { idx := rwctx.idx sas, err := rwctx.newStreamAggrConfig() if err != nil { logger.Fatalf("cannot initialize stream aggregators: %s", err) } if sas != nil { filePath := sas.FilePath() rwctx.sas.Store(sas) rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(idx) rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(idx) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, filePath)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, filePath)).Set(fasttime.UnixTimestamp()) } else { dedupInterval := streamAggrDedupInterval.GetOptionalArg(idx) if dedupInterval > 0 { alias := fmt.Sprintf("dedup-%d", idx+1) rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels, alias) } } } func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() { path := streamAggrConfig.GetOptionalArg(rwctx.idx) if path == "" { return } logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc() sasNew, err := rwctx.newStreamAggrConfig() if err != nil { metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0) logger.Errorf("cannot reload -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", path, err) return } sas := rwctx.sas.Load() if !sasNew.Equal(sas) { sasOld := rwctx.sas.Swap(sasNew) sasOld.MustStop() logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path) } else { sasNew.MustStop() logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path) } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1) metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp()) } func newStreamAggrConfigGlobal() (*streamaggr.Aggregators, error) { path := *streamAggrGlobalConfig if path == "" { return nil, nil } opts := &streamaggr.Options{ DedupInterval: streamAggrGlobalDedupInterval.Duration(), DropInputLabels: *streamAggrGlobalDropInputLabels, IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples, IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals, } sas, err := streamaggr.LoadFromFile(path, pushToRemoteStoragesTrackDropped, opts, "global") if err != nil { return nil, fmt.Errorf("cannot load -streamAggr.config=%q: %w", *streamAggrGlobalConfig, err) } return sas, nil } func (rwctx *remoteWriteCtx) newStreamAggrConfig() (*streamaggr.Aggregators, error) { return newStreamAggrConfigPerURL(rwctx.idx, rwctx.pushInternalTrackDropped) } func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamaggr.Aggregators, error) { path := streamAggrConfig.GetOptionalArg(idx) if path == "" { return nil, nil } alias := fmt.Sprintf("%d:secret-url", idx+1) if *showRemoteWriteURL { alias = fmt.Sprintf("%d:%s", idx+1, remoteWriteURLs.GetOptionalArg(idx)) } opts := &streamaggr.Options{ DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx), DropInputLabels: *streamAggrDropInputLabels, IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx), IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals, } sas, err := streamaggr.LoadFromFile(path, pushFunc, opts, alias) if err != nil { return nil, fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", path, err) } return sas, nil }