lib/streamaggr: added aggregator reload and global aggregator for vmagent

This commit is contained in:
AndrewChubatiuk 2024-05-13 16:33:59 +03:00 committed by Andrii Chubatiuk
parent c90e6de13b
commit bf12b1c907
8 changed files with 363 additions and 224 deletions

View file

@ -48,9 +48,9 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
}
if len(*relabelConfigPaths) > len(*remoteWriteURLs) {
return nil, fmt.Errorf("too many -remoteWrite.urlRelabelConfig args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*relabelConfigPaths), (len(*remoteWriteURLs)))
len(*relabelConfigPaths), len(*remoteWriteURLs))
}
rcs.perURL = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
rcs.perCtx = make([]*promrelabel.ParsedConfigs, len(*remoteWriteURLs))
for i, path := range *relabelConfigPaths {
if len(path) == 0 {
// Skip empty relabel config.
@ -60,14 +60,14 @@ func loadRelabelConfigs() (*relabelConfigs, error) {
if err != nil {
return nil, fmt.Errorf("cannot load relabel configs from -remoteWrite.urlRelabelConfig=%q: %w", path, err)
}
rcs.perURL[i] = prc
rcs.perCtx[i] = prc
}
return &rcs, nil
}
type relabelConfigs struct {
global *promrelabel.ParsedConfigs
perURL []*promrelabel.ParsedConfigs
perCtx []*promrelabel.ParsedConfigs
}
// initLabelsGlobal must be called after parsing command-line flags.

View file

@ -87,24 +87,6 @@ var (
maxIngestionRate = flag.Int("maxIngestionRate", 0, "The maximum number of samples vmagent can receive per second. Data ingestion is paused when the limit is exceeded. "+
"By default there are no limits on samples ingestion rate. See also -remoteWrite.rateLimit")
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current aggregation interval "+
"for the corresponding -remoteWrite.streamAggr.config . See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
disableOnDiskQueue = flagutil.NewArrayBool("remoteWrite.disableOnDiskQueue", "Whether to disable storing pending data to -remoteWrite.tmpDataPath "+
"when the configured remote storage systems cannot keep up with the data ingestion rate. See https://docs.victoriametrics.com/vmagent#disabling-on-disk-persistence ."+
"See also -remoteWrite.dropSamplesOnOverload")
@ -139,6 +121,9 @@ func MultitenancyEnabled() bool {
// Contains the current relabelConfigs.
var allRelabelConfigs atomic.Pointer[relabelConfigs]
// Contains the current streamAggrConfigs.
var allStreamAggrConfigs atomic.Pointer[streamAggrConfigs]
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
var maxQueues = cgroup.AvailableCPUs() * 16
@ -215,6 +200,12 @@ func Init() {
relabelConfigSuccess.Set(1)
relabelConfigTimestamp.Set(fasttime.UnixTimestamp())
sac := &streamAggrConfigs{}
if err := sac.loadStreamAggrGlobal(pushToRemoteStoragesDropFailed); err != nil {
logger.Fatalf("cannot load stream aggregation configs: %s", err)
}
allStreamAggrConfigs.Store(sac)
if len(*remoteWriteURLs) > 0 {
rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs)
}
@ -240,7 +231,9 @@ func Init() {
return
}
reloadRelabelConfigs()
reloadStreamAggrConfigs()
if err := allStreamAggrConfigs.Load().reloadStreamAggrConfigs(); err != nil {
logger.Fatalf("Failed to reload stream aggregation configs: %s", err)
}
}
}()
}
@ -301,12 +294,6 @@ var (
relabelConfigTimestamp = metrics.NewCounter(`vmagent_relabel_config_last_reload_success_timestamp_seconds`)
)
func reloadStreamAggrConfigs() {
for _, rwctx := range rwctxs {
rwctx.reinitStreamAggr()
}
}
func newRemoteWriteCtxs(at *auth.Token, urls []string) []*remoteWriteCtx {
if len(urls) == 0 {
logger.Panicf("BUG: urls must be non-empty")
@ -453,6 +440,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
defer putRelabelCtx(rctx)
}
globalRowsPushedBeforeRelabel.Add(rowsCount)
maxSamplesPerBlock := *maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
@ -491,6 +479,16 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
}
sortLabelsIfNeeded(tssBlock)
tssBlock = limitSeriesCardinality(tssBlock)
sac := allStreamAggrConfigs.Load()
if sac.global != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sac.global.Push(tssBlock, matchIdxs.B)
if !*streamAggrGlobalKeepInput {
tssBlock = dropAggregatedSeries(tssBlock, matchIdxs.B, *streamAggrGlobalDropInput)
}
matchIdxsPool.Put(matchIdxs)
}
if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) {
return false
}
@ -498,6 +496,12 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF
return true
}
func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) {
if tryPushBlockToRemoteStorages(tss, true) {
return
}
}
func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool {
if len(tssBlock) == 0 {
// Nothing to push
@ -722,7 +726,6 @@ type remoteWriteCtx struct {
fq *persistentqueue.FastQueue
c *client
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
streamAggrKeepInput bool
@ -807,27 +810,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
rowsDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_samples_dropped_total{path=%q, url=%q}`, queuePath, sanitizedURL)),
}
// Initialize sas
sasFile := streamAggrConfig.GetOptionalArg(argIdx)
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
ignoreOldSamples := streamAggrIgnoreOldSamples.GetOptionalArg(argIdx)
if sasFile != "" {
opts := &streamaggr.Options{
DedupInterval: dedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: ignoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sas, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
logger.Fatalf("cannot initialize stream aggregators from -remoteWrite.streamAggr.config=%q: %s", sasFile, err)
}
rwctx.sas.Store(sas)
sac := allStreamAggrConfigs.Load()
if err := sac.loadStreamAggrPerCtx(argIdx, rwctx.pushInternalTrackDropped); err != nil {
logger.Fatalf("cannot load stream aggregation config: %s", err)
}
if sac.perCtx[argIdx] != nil {
rwctx.streamAggrKeepInput = streamAggrKeepInput.GetOptionalArg(argIdx)
rwctx.streamAggrDropInput = streamAggrDropInput.GetOptionalArg(argIdx)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
} else if dedupInterval > 0 {
}
dedupInterval := streamAggrDedupInterval.GetOptionalArg(argIdx)
if dedupInterval > 0 {
rwctx.deduplicator = streamaggr.NewDeduplicator(rwctx.pushInternalTrackDropped, dedupInterval, *streamAggrDropInputLabels)
}
@ -837,8 +830,10 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, maxInmemoryBlocks in
func (rwctx *remoteWriteCtx) MustStop() {
// sas and deduplicator must be stopped before rwctx is closed
// because sas can write pending series to rwctx.pss if there are any
sas := rwctx.sas.Swap(nil)
sas.MustStop()
sas := allStreamAggrConfigs.Load().perCtx[rwctx.idx]
if sas != nil {
sas.MustStop(nil)
}
if rwctx.deduplicator != nil {
rwctx.deduplicator.MustStop()
@ -870,7 +865,7 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
var rctx *relabelCtx
var v *[]prompbmarshal.TimeSeries
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
pcs := rcs.perCtx[rwctx.idx]
if pcs.Len() > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent
@ -888,7 +883,8 @@ func (rwctx *remoteWriteCtx) TryPush(tss []prompbmarshal.TimeSeries, forceDropSa
rwctx.rowsPushedAfterRelabel.Add(rowsCount)
// Apply stream aggregation or deduplication if they are configured
sas := rwctx.sas.Load()
sac := allStreamAggrConfigs.Load()
sas := sac.perCtx[rwctx.idx]
if sas != nil {
matchIdxs := matchIdxsPool.Get()
matchIdxs.B = sas.Push(tss, matchIdxs.B)
@ -985,40 +981,6 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo
return ok
}
func (rwctx *remoteWriteCtx) reinitStreamAggr() {
sasFile := streamAggrConfig.GetOptionalArg(rwctx.idx)
if sasFile == "" {
// There is no stream aggregation for rwctx
return
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", sasFile)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, sasFile)).Inc()
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(rwctx.idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(rwctx.idx),
}
sasNew, err := streamaggr.LoadFromFile(sasFile, rwctx.pushInternalTrackDropped, opts)
if err != nil {
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, sasFile)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(0)
logger.Errorf("cannot reload stream aggregation config from -remoteWrite.streamAggr.config=%q; continue using the previously loaded config; error: %s", sasFile, err)
return
}
sas := rwctx.sas.Load()
if !sasNew.Equal(sas) {
sasOld := rwctx.sas.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation configs at -remoteWrite.streamAggr.config=%q", sasFile)
} else {
sasNew.MustStop()
logger.Infof("the config at -remoteWrite.streamAggr.config=%q wasn't changed", sasFile)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, sasFile)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, sasFile)).Set(fasttime.UnixTimestamp())
}
var tssPool = &sync.Pool{
New: func() interface{} {
a := []prompbmarshal.TimeSeries{}
@ -1034,27 +996,6 @@ func getRowsCount(tss []prompbmarshal.TimeSeries) int {
return rowsCount
}
// CheckStreamAggrConfigs checks configs pointed by -remoteWrite.streamAggr.config
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
for idx, sasFile := range *streamAggrConfig {
if sasFile == "" {
continue
}
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
}
sas, err := streamaggr.LoadFromFile(sasFile, pushNoop, opts)
if err != nil {
return fmt.Errorf("cannot load -remoteWrite.streamAggr.config=%q: %w", sasFile, err)
}
sas.MustStop()
}
return nil
}
func newMapFromStrings(a []string) map[string]struct{} {
m := make(map[string]struct{}, len(a))
for _, s := range a {

View file

@ -57,13 +57,13 @@ func TestGetLabelsHash_Distribution(t *testing.T) {
func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
f := func(streamAggrConfig, relabelConfig string, dedupInterval time.Duration, keepInput, dropInput bool, input string) {
t.Helper()
perURLRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
perCtxRelabel, err := promrelabel.ParseRelabelConfigsData([]byte(relabelConfig))
if err != nil {
t.Fatalf("cannot load relabel configs: %s", err)
}
rcs := &relabelConfigs{
perURL: []*promrelabel.ParsedConfigs{
perURLRelabel,
perCtx: []*promrelabel.ParsedConfigs{
perCtxRelabel,
},
}
allRelabelConfigs.Store(rcs)
@ -84,11 +84,16 @@ func TestRemoteWriteContext_TryPush_ImmutableTimeseries(t *testing.T) {
if len(streamAggrConfig) > 0 {
f := createFile(t, []byte(streamAggrConfig))
sas, err := streamaggr.LoadFromFile(f.Name(), nil, nil)
if err != nil {
sas := streamaggr.NewAggregators(f.Name(), nil, nil)
if err := sas.Load(); err != nil {
t.Fatalf("cannot load streamaggr configs: %s", err)
}
rwctx.sas.Store(sas)
sac := &streamAggrConfigs{
perCtx: []*streamaggr.Aggregators{
sas,
},
}
allStreamAggrConfigs.Store(sac)
}
inputTss := mustParsePromMetrics(input)

View file

@ -0,0 +1,127 @@
package remotewrite
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
)
var (
// Global config
streamAggrGlobalConfig = flag.String("remoteWrite.streamAggr.global.config", "", "Optional path to file with stream aggregation global config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.global.keepInput, -remoteWrite.streamAggr.global.dropInput and -remoteWrite.streamAggr.global.dedupInterval")
streamAggrGlobalKeepInput = flag.Bool("remoteWrite.streamAggr.global.keepInput", false, "Whether to keep all the input samples after the global aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDropInput = flag.Bool("remoteWrite.streamAggr.global.dropInput", false, "Whether to drop all the input samples after the global aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.global.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrGlobalDedupInterval = flagutil.NewDuration("remoteWrite.streamAggr.global.dedupInterval", "0s", "Input samples are de-duplicated with this interval on global "+
"aggregator before optional aggregation with -remoteWrite.streamAggr.config . "+
"See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrGlobalIgnoreOldSamples = flag.Bool("remoteWrite.streamAggr.global.ignoreOldSamples", false, "Whether to ignore input samples with old timestamps outside the "+
"current aggregation interval for global aggregator for the corresponding -remoteWrite.streamAggr.config . "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrGlobalIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.global.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start for global "+
"aggregator. Increase this value if you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from "+
"clients pushing data into the vmagent. See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrGlobalDropInputLabels = flagutil.NewArrayString("streamAggr.global.dropInputLabels", "An optional list of labels to drop from samples for global aggregator "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
// Per URL config
streamAggrConfig = flagutil.NewArrayString("remoteWrite.streamAggr.config", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
"See also -remoteWrite.streamAggr.keepInput, -remoteWrite.streamAggr.dropInput and -remoteWrite.streamAggr.dedupInterval")
streamAggrDropInput = flagutil.NewArrayBool("remoteWrite.streamAggr.dropInput", "Whether to drop all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrKeepInput = flagutil.NewArrayBool("remoteWrite.streamAggr.keepInput", "Whether to keep all the input samples after the aggregation "+
"with -remoteWrite.streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
"are written to the corresponding -remoteWrite.url . See also -remoteWrite.streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
streamAggrDedupInterval = flagutil.NewArrayDuration("remoteWrite.streamAggr.dedupInterval", 0, "Input samples are de-duplicated with this interval before optional aggregation "+
"with -remoteWrite.streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation/#deduplication")
streamAggrIgnoreOldSamples = flagutil.NewArrayBool("remoteWrite.streamAggr.ignoreOldSamples", "Whether to ignore input samples with old timestamps outside the current "+
"aggregation interval for the corresponding -remoteWrite.streamAggr.config . "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignoring-old-samples")
streamAggrIgnoreFirstIntervals = flag.Int("remoteWrite.streamAggr.ignoreFirstIntervals", 0, "Number of aggregation intervals to skip after the start. Increase this value if "+
"you observe incorrect aggregation results after vmagent restarts. It could be caused by receiving unordered delayed data from clients pushing data into the vmagent. "+
"See https://docs.victoriametrics.com/stream-aggregation/#ignore-aggregation-intervals-on-start")
streamAggrDropInputLabels = flagutil.NewArrayString("streamAggr.dropInputLabels", "An optional list of labels to drop from samples "+
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation/#dropping-unneeded-labels")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -remoteWrite.streamAggr.globalConfig.
func CheckStreamAggrConfigs() error {
pushNoop := func(_ []prompbmarshal.TimeSeries) {}
sac := &streamAggrConfigs{}
if err := sac.loadStreamAggrGlobal(pushNoop); err != nil {
return fmt.Errorf("could not load global stream aggregation config: %w", err)
}
if len(*streamAggrConfig) > len(*remoteWriteURLs) {
return fmt.Errorf("too many -remoteWrite.streamAggr.config args: %d; it mustn't exceed the number of -remoteWrite.url args: %d",
len(*streamAggrConfig), len(*remoteWriteURLs))
}
for i := range *streamAggrConfig {
if err := sac.loadStreamAggrPerCtx(i, pushNoop); err != nil {
return err
}
}
return nil
}
func (sac *streamAggrConfigs) reloadStreamAggrConfigs() error {
if err := sac.global.Reload(); err != nil {
return fmt.Errorf("failed to reload global config: %w", err)
}
for _, perCtx := range sac.perCtx {
if err := perCtx.Reload(); err != nil {
return fmt.Errorf("failed to reload config at location %q: %w", perCtx.ConfigPath(), err)
}
}
return nil
}
func (sac *streamAggrConfigs) loadStreamAggrGlobal(pushFunc streamaggr.PushFunc) error {
sac.perCtx = make([]*streamaggr.Aggregators, len(*remoteWriteURLs))
if *streamAggrGlobalConfig != "" {
path := *streamAggrGlobalConfig
opts := &streamaggr.Options{
DedupInterval: streamAggrGlobalDedupInterval.Duration(),
DropInputLabels: *streamAggrGlobalDropInputLabels,
IgnoreOldSamples: *streamAggrGlobalIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrGlobalIgnoreFirstIntervals,
}
sac.global = streamaggr.NewAggregators(path, pushFunc, opts)
return sac.global.Load()
}
return nil
}
func (sac *streamAggrConfigs) loadStreamAggrPerCtx(idx int, pushFunc streamaggr.PushFunc) error {
if len(*streamAggrConfig) == 0 {
return nil
}
paths := *streamAggrConfig
path := paths[idx]
if len(path) == 0 {
// Skip empty stream aggregation config.
return nil
}
opts := &streamaggr.Options{
DedupInterval: streamAggrDedupInterval.GetOptionalArg(idx),
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: streamAggrIgnoreOldSamples.GetOptionalArg(idx),
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sac.perCtx[idx] = streamaggr.NewAggregators(path, pushFunc, opts)
return sac.perCtx[idx].Load()
}
type streamAggrConfigs struct {
global *streamaggr.Aggregators
perCtx []*streamaggr.Aggregators
}

View file

@ -15,7 +15,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
)
var (
@ -42,11 +41,6 @@ var (
saCfgReloaderStopCh chan struct{}
saCfgReloaderWG sync.WaitGroup
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil)
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
sasGlobal atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
)
@ -63,11 +57,11 @@ func CheckStreamAggrConfig() error {
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushNoop, opts)
if err != nil {
sas := streamaggr.NewAggregators(*streamAggrConfig, pushNoop, opts)
if err := sas.Load(); err != nil {
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
}
sas.MustStop()
sas.MustStop(nil)
return nil
}
@ -92,14 +86,12 @@ func InitStreamAggr() {
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sas, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil {
sas := streamaggr.NewAggregators(*streamAggrConfig, pushAggregateSeries, opts)
if err := sas.Load(); err != nil {
logger.Fatalf("cannot load -streamAggr.config=%q: %s", *streamAggrConfig, err)
}
sasGlobal.Store(sas)
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
// Start config reloader.
saCfgReloaderWG.Add(1)
@ -118,32 +110,12 @@ func InitStreamAggr() {
func reloadStreamAggrConfig() {
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
saCfgReloads.Inc()
opts := &streamaggr.Options{
DedupInterval: *streamAggrDedupInterval,
DropInputLabels: *streamAggrDropInputLabels,
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
}
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
if err != nil {
saCfgSuccess.Set(0)
saCfgReloadErr.Inc()
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
return
}
sas := sasGlobal.Load()
if !sasNew.Equal(sas) {
sasOld := sasGlobal.Swap(sasNew)
sasOld.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
if err := sas.Reload(); err != nil {
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
} else {
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
sasNew.MustStop()
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
}
saCfgSuccess.Set(1)
saCfgTimestamp.Set(fasttime.UnixTimestamp())
}
// MustStopStreamAggr stops stream aggregators.
@ -152,7 +124,7 @@ func MustStopStreamAggr() {
saCfgReloaderWG.Wait()
sas := sasGlobal.Swap(nil)
sas.MustStop()
sas.MustStop(nil)
if deduplicator != nil {
deduplicator.MustStop()

View file

@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
@ -23,6 +24,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"gopkg.in/yaml.v2"
)
@ -58,27 +60,54 @@ var (
})
)
// LoadFromFile loads Aggregators from the given path and uses the given pushFunc for pushing the aggregated data.
// NewAggregators initializes Aggregators struct with shared data for all aggregators
func NewAggregators(path string, pushFunc PushFunc, opts *Options) *Aggregators {
return &Aggregators{
path: path,
pushFunc: pushFunc,
opts: opts,
}
}
// Reload reads config file and updates aggregators if there're any changes
func (a *Aggregators) Reload() error {
if a == nil {
return nil
}
logger.Infof("reloading stream aggregation config at %q", a.path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc()
if err := a.Load(); err != nil {
logger.Errorf("cannot reload stream aggregation config from %q; continue using the previously loaded config; error: %s", a.path, err)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc()
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0)
return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp())
logger.Infof("reloaded stream aggregation config at %q", a.path)
return nil
}
// Load loads Aggregators from predefined path and uses pushFunc for pushing the aggregated data.
//
// opts can contain additional options. If opts is nil, then default options are used.
//
// The returned Aggregators must be stopped with MustStop() when no longer needed.
func LoadFromFile(path string, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
data, err := fscore.ReadFileOrHTTP(path)
func (a *Aggregators) Load() error {
data, err := fscore.ReadFileOrHTTP(a.path)
if err != nil {
return nil, fmt.Errorf("cannot load aggregators: %w", err)
return fmt.Errorf("cannot load aggregators: %w", err)
}
data, err = envtemplate.ReplaceBytes(data)
if err != nil {
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err)
}
as, err := newAggregatorsFromData(data, pushFunc, opts)
err = a.loadAggregatorsFromData(data)
if err != nil {
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
}
return as, nil
return nil
}
// Options contains optional settings for the Aggregators.
@ -232,42 +261,73 @@ type Config struct {
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
type Aggregators struct {
as []*aggregator
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
ms *metrics.Set
as []*aggregator
ms *metrics.Set
path string
pushFunc PushFunc
opts *Options
}
func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Aggregators, error) {
func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator {
if a == nil {
return nil
}
if idx := slices.IndexFunc(a.as, func(ac *aggregator) bool {
return ac.configHash == aggr.configHash
}); idx >= 0 {
return a.as[idx]
}
return nil
}
// ConfigPath returns path of aggregators config file
func (a *Aggregators) ConfigPath() string {
return a.path
}
func (a *Aggregators) loadAggregatorsFromData(data []byte) error {
var cfgs []*Config
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
return fmt.Errorf("cannot parse stream aggregation config: %w", err)
}
ms := metrics.NewSet()
as := make([]*aggregator, len(cfgs))
unchanged := make([]*aggregator, len(cfgs))
unchanged = unchanged[:0]
ac := make([]*aggregator, len(cfgs))
ac = ac[:0]
ignoreAggrHashes := make([]uint64, len(a.as))
ignoreAggrHashes = ignoreAggrHashes[:0]
for i, cfg := range cfgs {
a, err := newAggregator(cfg, pushFunc, ms, opts)
aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts)
if err != nil {
// Stop already initialized aggregators before returning the error.
for _, a := range as[:i] {
a.MustStop()
for _, s := range ac[:i] {
s.MustStop()
}
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
}
as[i] = a
if oldAgg := a.getAggregator(aggr); oldAgg != nil {
aggr.MustStop()
unchanged = append(unchanged, oldAgg)
ignoreAggrHashes = append(ignoreAggrHashes, oldAgg.configHash)
continue
}
if slices.ContainsFunc(ac, func(x *aggregator) bool {
return x.configHash == aggr.configHash
}) {
aggr.MustStop()
continue
}
ac = append(ac, aggr)
}
configData, err := json.Marshal(cfgs)
if err != nil {
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrHashes))
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac))
a.MustStop(ignoreAggrHashes)
a.as = slices.Concat(unchanged, ac)
_ = ms.NewGauge(`vm_streamaggr_dedup_state_size_bytes`, func() float64 {
n := uint64(0)
for _, aggr := range as {
for _, aggr := range a.as {
if aggr.da != nil {
n += aggr.da.sizeBytes()
}
@ -276,24 +336,20 @@ func newAggregatorsFromData(data []byte, pushFunc PushFunc, opts *Options) (*Agg
})
_ = ms.NewGauge(`vm_streamaggr_dedup_state_items_count`, func() float64 {
n := uint64(0)
for _, aggr := range as {
for _, aggr := range a.as {
if aggr.da != nil {
n += aggr.da.itemsCount()
}
}
return float64(n)
})
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
configData: configData,
ms: ms,
}, nil
a.ms = ms
return nil
}
// MustStop stops a.
func (a *Aggregators) MustStop() {
func (a *Aggregators) MustStop(ignoreAggrHashes []uint64) {
if a == nil {
return
}
@ -302,7 +358,9 @@ func (a *Aggregators) MustStop() {
a.ms = nil
for _, aggr := range a.as {
aggr.MustStop()
if ignoreAggrHashes == nil || !slices.Contains(ignoreAggrHashes, aggr.configHash) {
aggr.MustStop()
}
}
a.as = nil
}
@ -312,7 +370,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
if a == nil || b == nil {
return a == nil && b == nil
}
return string(a.configData) == string(b.configData)
return slices.Compare(a.getConfigHashes(), b.getConfigHashes()) == 0
}
func (a *Aggregators) getConfigHashes() []uint64 {
result := make([]uint64, len(a.as))
for i := range result {
result[i] = a.as[i].configHash
}
slices.Sort(result)
return result
}
// Push pushes tss to a.
@ -350,6 +417,7 @@ type aggregator struct {
keepMetricNames bool
ignoreOldSamples bool
configHash uint64
by []string
without []string
@ -443,6 +511,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
dropInputLabels := opts.DropInputLabels
if v := cfg.DropInputLabels; v != nil {
dropInputLabels = *v
} else {
cfg.DropInputLabels = &dropInputLabels
}
// initialize input_relabel_configs and output_relabel_configs
@ -470,6 +540,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
keepMetricNames := opts.KeepMetricNames
if v := cfg.KeepMetricNames; v != nil {
keepMetricNames = *v
} else {
cfg.KeepMetricNames = &keepMetricNames
}
if keepMetricNames {
if len(cfg.Outputs) != 1 {
@ -484,12 +556,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
ignoreOldSamples := opts.IgnoreOldSamples
if v := cfg.IgnoreOldSamples; v != nil {
ignoreOldSamples = *v
} else {
cfg.IgnoreOldSamples = &ignoreOldSamples
}
// check cfg.IgnoreFirstIntervals
ignoreFirstIntervals := opts.IgnoreFirstIntervals
if v := cfg.IgnoreFirstIntervals; v != nil {
ignoreFirstIntervals = *v
} else {
cfg.IgnoreFirstIntervals = &ignoreFirstIntervals
}
// initialize outputs list
@ -497,6 +573,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
}
slices.Sort(cfg.Outputs)
cfg.Outputs = slices.Compact(cfg.Outputs)
aggrStates := make([]aggrState, len(cfg.Outputs))
for i, output := range cfg.Outputs {
if strings.HasPrefix(output, "quantiles(") {
@ -601,23 +679,34 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts *Option
flushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_flush_timeouts_total`),
dedupFlushTimeouts: ms.GetOrCreateCounter(`vm_streamaggr_dedup_flush_timeouts_total`),
}
if dedupInterval > 0 {
a.da = newDedupAggr()
}
alignFlushToInterval := !opts.NoAlignFlushToInterval
noAlignFlushToInterval := opts.NoAlignFlushToInterval
if v := cfg.NoAlignFlushToInterval; v != nil {
alignFlushToInterval = !*v
noAlignFlushToInterval = *v
} else {
cfg.NoAlignFlushToInterval = &noAlignFlushToInterval
}
skipIncompleteFlush := !opts.FlushOnShutdown
flushOnShutdown := opts.FlushOnShutdown
if v := cfg.FlushOnShutdown; v != nil {
skipIncompleteFlush = !*v
flushOnShutdown = !*v
} else {
cfg.FlushOnShutdown = &flushOnShutdown
}
data, err := json.Marshal(&cfg)
if err != nil {
return nil, fmt.Errorf("Failed to marshal config: %w", err)
}
a.configHash = xxhash.Sum64(data)
a.wg.Add(1)
go func() {
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals)
a.wg.Done()
}()

View file

@ -17,16 +17,11 @@ import (
func TestAggregatorsFailure(t *testing.T) {
f := func(config string) {
t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {
panic(fmt.Errorf("pushFunc shouldn't be called"))
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
a := &Aggregators{}
err := a.loadAggregatorsFromData([]byte(config))
if err == nil {
t.Fatalf("expecting non-nil error")
}
if a != nil {
t.Fatalf("expecting nil a")
}
}
// Invalid config
@ -158,12 +153,16 @@ func TestAggregatorsEqual(t *testing.T) {
t.Helper()
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
aa, err := newAggregatorsFromData([]byte(a), pushFunc, nil)
if err != nil {
aa := &Aggregators{
pushFunc: pushFunc,
}
if err := aa.loadAggregatorsFromData([]byte(a)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
ab, err := newAggregatorsFromData([]byte(b), pushFunc, nil)
if err != nil {
ab := &Aggregators{
pushFunc: pushFunc,
}
if err := ab.loadAggregatorsFromData([]byte(b)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
result := aa.Equal(ab)
@ -221,19 +220,21 @@ func TestAggregatorsSuccess(t *testing.T) {
tssOutput = appendClonedTimeseries(tssOutput, tss)
tssOutputLock.Unlock()
}
opts := &Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
a := &Aggregators{
opts: &Options{
FlushOnShutdown: true,
NoAlignFlushToInterval: true,
},
pushFunc: pushFunc,
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
if err != nil {
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
matchIdxs := a.Push(tssInput, nil)
a.MustStop()
a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""
@ -905,19 +906,21 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
}
tssOutputLock.Unlock()
}
opts := &Options{
DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
a := &Aggregators{
pushFunc: pushFunc,
opts: &Options{
DedupInterval: 30 * time.Second,
FlushOnShutdown: true,
},
}
a, err := newAggregatorsFromData([]byte(config), pushFunc, opts)
if err != nil {
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
t.Fatalf("cannot initialize aggregators: %s", err)
}
// Push the inputMetrics to Aggregators
tssInput := mustParsePromMetrics(inputMetrics)
matchIdxs := a.Push(tssInput, nil)
a.MustStop()
a.MustStop(nil)
// Verify matchIdxs equals to matchIdxsExpected
matchIdxsStr := ""

View file

@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
}
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators(outputs, pushFunc)
defer a.MustStop()
defer a.MustStop(nil)
_ = a.Push(benchSeries, nil)
b.ResetTimer()
@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
func benchmarkAggregatorsPush(b *testing.B, output string) {
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
a := newBenchAggregators([]string{output}, pushFunc)
defer a.MustStop()
defer a.MustStop(nil)
const loops = 100
@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := newAggregatorsFromData([]byte(config), pushFunc, nil)
if err != nil {
a := &Aggregators{
pushFunc: pushFunc,
}
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a