mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
hot-reload enabled
This commit is contained in:
parent
aca33307a9
commit
ad29bd69ee
7 changed files with 213 additions and 157 deletions
|
@ -231,15 +231,23 @@ func Init() {
|
|||
// Start config reloader.
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var streamAggrConfigReloaderCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
streamAggrConfigReloaderCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
case <-streamAggrConfigReloaderCh:
|
||||
reloadStreamAggrConfigs()
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
}
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -376,11 +384,9 @@ func Stop() {
|
|||
close(configReloaderStopCh)
|
||||
configReloaderWG.Wait()
|
||||
|
||||
sasGlobal.Load().MustStop()
|
||||
if deduplicatorGlobal != nil {
|
||||
sasGlobal.Load().MustStop(nil)
|
||||
deduplicatorGlobal.MustStop()
|
||||
deduplicatorGlobal = nil
|
||||
}
|
||||
|
||||
for _, rwctx := range rwctxs {
|
||||
rwctx.MustStop()
|
||||
|
@ -850,12 +856,10 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
|||
// sas and deduplicator must be stopped before rwctx is closed
|
||||
// because they can write pending series to rwctx.pss if there are any
|
||||
sas := rwctx.sas.Swap(nil)
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
|
||||
if rwctx.deduplicator != nil {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
|
|
|
@ -4,12 +4,10 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -17,6 +15,8 @@ var (
|
|||
streamAggrGlobalConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
|
||||
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
|
||||
"and -remoteWrite.streamAggr.config")
|
||||
streamAggrGlobalKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation "+
|
||||
"with -streamAggr.config. By default, only aggregates samples are dropped, while the remaining samples "+
|
||||
"are written to remote storages write. See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
|
||||
|
@ -79,47 +79,32 @@ func CheckStreamAggrConfigs() error {
|
|||
}
|
||||
|
||||
func reloadStreamAggrConfigs() {
|
||||
reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed)
|
||||
for idx, rwctx := range rwctxs {
|
||||
reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped)
|
||||
reloadStreamAggrConfig(-1)
|
||||
for idx := range rwctxs {
|
||||
reloadStreamAggrConfig(idx)
|
||||
}
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) {
|
||||
path, opts := getStreamAggrOpts(idx)
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := newStreamAggrConfigWithOpts(pushFunc, path, opts)
|
||||
if err != nil {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_errors_total{path=%q}`, path)).Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(0)
|
||||
logger.Errorf("cannot reload stream aggregation config at %q; continue using the previously loaded config; error: %s", path, err)
|
||||
return
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfig(idx int) {
|
||||
path, _ := getStreamAggrOpts(idx)
|
||||
var sas *streamaggr.Aggregators
|
||||
var flag string
|
||||
if idx < 0 {
|
||||
flag = "-streamAggr.config"
|
||||
sas = sasGlobal.Load()
|
||||
} else {
|
||||
flag = "-remoteWrite.streamAggr.config"
|
||||
sas = rwctxs[idx].sas.Load()
|
||||
}
|
||||
|
||||
if !sasNew.Equal(sas) {
|
||||
var sasOld *streamaggr.Aggregators
|
||||
if idx < 0 {
|
||||
sasOld = sasGlobal.Swap(sasNew)
|
||||
} else {
|
||||
sasOld = rwctxs[idx].sas.Swap(sasNew)
|
||||
if sas == nil {
|
||||
return
|
||||
}
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation configs at %q", path)
|
||||
logger.Infof("reloading stream aggregation configs pointed by %s=%q", flag, path)
|
||||
if err := sas.Reload(); err != nil {
|
||||
logger.Errorf("cannot reload %s=%q; continue using the previously loaded config; error: %s", flag, path, err)
|
||||
return
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("successfully reloaded stream aggregation config %s=%q", flag, path)
|
||||
}
|
||||
|
||||
func getStreamAggrOpts(idx int) (string, streamaggr.Options) {
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -15,13 +16,14 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/stream-aggregation/ . "+
|
||||
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking changes in -streamAggr.config "+
|
||||
"and -remoteWrite.streamAggr.config")
|
||||
streamAggrKeepInput = flag.Bool("streamAggr.keepInput", false, "Whether to keep all the input samples after the aggregation with -streamAggr.config. "+
|
||||
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. "+
|
||||
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation/")
|
||||
|
@ -42,11 +44,6 @@ var (
|
|||
saCfgReloaderStopCh chan struct{}
|
||||
saCfgReloaderWG sync.WaitGroup
|
||||
|
||||
saCfgReloads = metrics.NewCounter(`vminsert_streamagg_config_reloads_total`)
|
||||
saCfgReloadErr = metrics.NewCounter(`vminsert_streamagg_config_reloads_errors_total`)
|
||||
saCfgSuccess = metrics.NewGauge(`vminsert_streamagg_config_last_reload_successful`, nil)
|
||||
saCfgTimestamp = metrics.NewCounter(`vminsert_streamagg_config_last_reload_success_timestamp_seconds`)
|
||||
|
||||
sasGlobal atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
)
|
||||
|
@ -68,7 +65,7 @@ func CheckStreamAggrConfig() error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("error when loading -streamAggr.config=%q: %w", *streamAggrConfig, err)
|
||||
}
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -101,16 +98,21 @@ func InitStreamAggr() {
|
|||
}
|
||||
|
||||
sasGlobal.Store(sas)
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
// Start config reloader.
|
||||
saCfgReloaderWG.Add(1)
|
||||
go func() {
|
||||
var streamAggrConfigReloaderCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
streamAggrConfigReloaderCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer saCfgReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
case <-streamAggrConfigReloaderCh:
|
||||
case <-saCfgReloaderStopCh:
|
||||
return
|
||||
}
|
||||
|
@ -121,33 +123,12 @@ func InitStreamAggr() {
|
|||
|
||||
func reloadStreamAggrConfig() {
|
||||
logger.Infof("reloading -streamAggr.config=%q", *streamAggrConfig)
|
||||
saCfgReloads.Inc()
|
||||
|
||||
opts := streamaggr.Options{
|
||||
DedupInterval: *streamAggrDedupInterval,
|
||||
DropInputLabels: *streamAggrDropInputLabels,
|
||||
IgnoreOldSamples: *streamAggrIgnoreOldSamples,
|
||||
IgnoreFirstIntervals: *streamAggrIgnoreFirstIntervals,
|
||||
Alias: "global",
|
||||
}
|
||||
sasNew, err := streamaggr.LoadFromFile(*streamAggrConfig, pushAggregateSeries, opts)
|
||||
if err != nil {
|
||||
saCfgSuccess.Set(0)
|
||||
saCfgReloadErr.Inc()
|
||||
sas := sasGlobal.Load()
|
||||
if err := sas.Reload(); err != nil {
|
||||
logger.Errorf("cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s", *streamAggrConfig, err)
|
||||
return
|
||||
}
|
||||
sas := sasGlobal.Load()
|
||||
if !sasNew.Equal(sas) {
|
||||
sasOld := sasGlobal.Swap(sasNew)
|
||||
sasOld.MustStop()
|
||||
logger.Infof("successfully reloaded stream aggregation config at -streamAggr.config=%q", *streamAggrConfig)
|
||||
} else {
|
||||
logger.Infof("nothing changed in -streamAggr.config=%q", *streamAggrConfig)
|
||||
sasNew.MustStop()
|
||||
}
|
||||
saCfgSuccess.Set(1)
|
||||
saCfgTimestamp.Set(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
// MustStopStreamAggr stops stream aggregators.
|
||||
|
@ -156,13 +137,11 @@ func MustStopStreamAggr() {
|
|||
saCfgReloaderWG.Wait()
|
||||
|
||||
sas := sasGlobal.Swap(nil)
|
||||
sas.MustStop()
|
||||
sas.MustStop(nil)
|
||||
|
||||
if deduplicator != nil {
|
||||
deduplicator.MustStop()
|
||||
deduplicator = nil
|
||||
}
|
||||
}
|
||||
|
||||
type streamAggrCtx struct {
|
||||
mn storage.MetricName
|
||||
|
|
|
@ -73,6 +73,9 @@ func NewDeduplicator(pushFunc PushFunc, dedupInterval time.Duration, dropLabels
|
|||
|
||||
// MustStop stops d.
|
||||
func (d *Deduplicator) MustStop() {
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
metrics.UnregisterSet(d.ms)
|
||||
d.ms = nil
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs/fscore"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||
|
@ -73,21 +74,28 @@ var (
|
|||
//
|
||||
// The returned Aggregators must be stopped with MustStop() when no longer needed.
|
||||
func LoadFromFile(path string, pushFunc PushFunc, opts Options) (*Aggregators, error) {
|
||||
data, err := fscore.ReadFileOrHTTP(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load aggregators: %w", err)
|
||||
a := &Aggregators{
|
||||
path: path,
|
||||
pushFunc: pushFunc,
|
||||
opts: opts,
|
||||
}
|
||||
data, err = envtemplate.ReplaceBytes(data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot expand environment variables in %q: %w", path, err)
|
||||
if err := a.load(); err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
|
||||
}
|
||||
return a, nil
|
||||
}
|
||||
|
||||
as, err := LoadFromData(data, pushFunc, opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", path, err)
|
||||
// Reload reads config file and updates aggregators if there're any changes
|
||||
func (a *Aggregators) Reload() error {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_total{path=%q}`, a.path)).Inc()
|
||||
if err := a.load(); err != nil {
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reloads_errors_total{path=%q}`, a.path)).Inc()
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(0)
|
||||
return fmt.Errorf("cannot load stream aggregation config %q: %w", a.path, err)
|
||||
}
|
||||
|
||||
return as, nil
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_successful{path=%q}`, a.path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, a.path)).Set(fasttime.UnixTimestamp())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Options contains optional settings for the Aggregators.
|
||||
|
@ -248,44 +256,82 @@ type Config struct {
|
|||
// Aggregators aggregates metrics passed to Push and calls pushFunc for aggregated data.
|
||||
type Aggregators struct {
|
||||
as []*aggregator
|
||||
|
||||
// configData contains marshaled configs.
|
||||
// It is used in Equal() for comparing Aggregators.
|
||||
configData []byte
|
||||
|
||||
ms *metrics.Set
|
||||
path string
|
||||
pushFunc PushFunc
|
||||
opts Options
|
||||
}
|
||||
|
||||
// LoadFromData loads aggregators from data.
|
||||
func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, error) {
|
||||
func (a *Aggregators) load() error {
|
||||
data, err := fscore.ReadFileOrHTTP(a.path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot load aggregators: %w", err)
|
||||
}
|
||||
data, err = envtemplate.ReplaceBytes(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot expand environment variables in %q: %w", a.path, err)
|
||||
}
|
||||
if err = a.loadAggregatorsFromData(data); err != nil {
|
||||
return fmt.Errorf("cannot initialize aggregators from %q: %w; see https://docs.victoriametrics.com/stream-aggregation/#stream-aggregation-config", a.path, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) getAggregator(aggr *aggregator) *aggregator {
|
||||
idx := slices.IndexFunc(a.as, func(ac *aggregator) bool {
|
||||
return ac.configData == aggr.configData
|
||||
})
|
||||
if idx >= 0 {
|
||||
return a.as[idx]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *Aggregators) loadAggregatorsFromData(data []byte) error {
|
||||
var cfgs []*Config
|
||||
if err := yaml.UnmarshalStrict(data, &cfgs); err != nil {
|
||||
return nil, fmt.Errorf("cannot parse stream aggregation config: %w", err)
|
||||
return fmt.Errorf("cannot parse stream aggregation config: %w", err)
|
||||
}
|
||||
|
||||
ms := metrics.NewSet()
|
||||
as := make([]*aggregator, len(cfgs))
|
||||
var unchanged, ac []*aggregator
|
||||
var ignoreAggrConfigs []string
|
||||
for i, cfg := range cfgs {
|
||||
opts.aggrID = i + 1
|
||||
a, err := newAggregator(cfg, pushFunc, ms, opts)
|
||||
a.opts.aggrID = i + 1
|
||||
aggr, err := newAggregator(cfg, a.pushFunc, ms, a.opts)
|
||||
if err != nil {
|
||||
// Stop already initialized aggregators before returning the error.
|
||||
for _, a := range as[:i] {
|
||||
a.MustStop()
|
||||
for _, c := range ac[:i] {
|
||||
c.MustStop()
|
||||
}
|
||||
return nil, fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
return fmt.Errorf("cannot initialize aggregator #%d: %w", i, err)
|
||||
}
|
||||
as[i] = a
|
||||
if oldAggr := a.getAggregator(aggr); oldAggr != nil {
|
||||
aggr.MustStop()
|
||||
if !slices.Contains(ignoreAggrConfigs, oldAggr.configData) {
|
||||
unchanged = append(unchanged, oldAggr)
|
||||
ignoreAggrConfigs = append(ignoreAggrConfigs, oldAggr.configData)
|
||||
}
|
||||
configData, err := json.Marshal(cfgs)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot marshal the provided configs: %s", err)
|
||||
continue
|
||||
}
|
||||
if slices.ContainsFunc(ac, func(x *aggregator) bool {
|
||||
return x.configData == aggr.configData
|
||||
}) {
|
||||
aggr.MustStop()
|
||||
continue
|
||||
}
|
||||
ac = append(ac, aggr)
|
||||
}
|
||||
|
||||
metricLabels := fmt.Sprintf("url=%q", opts.Alias)
|
||||
metricLabels := fmt.Sprintf("url=%q", a.opts.Alias)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_stopped_total{path=%q}`, a.path)).Add(len(a.as) - len(ignoreAggrConfigs))
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_streamaggr_aggregators_created_total{path=%q}`, a.path)).Add(len(ac))
|
||||
a.MustStop(ignoreAggrConfigs)
|
||||
a.as = slices.Concat(unchanged, ac)
|
||||
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_size_bytes{%s}`, metricLabels), func() float64 {
|
||||
n := uint64(0)
|
||||
for _, aggr := range as {
|
||||
for _, aggr := range a.as {
|
||||
if aggr.da != nil {
|
||||
n += aggr.da.sizeBytes()
|
||||
}
|
||||
|
@ -294,7 +340,7 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
|
|||
})
|
||||
_ = ms.NewGauge(fmt.Sprintf(`vm_streamaggr_dedup_state_items_count{%s}`, metricLabels), func() float64 {
|
||||
n := uint64(0)
|
||||
for _, aggr := range as {
|
||||
for _, aggr := range a.as {
|
||||
if aggr.da != nil {
|
||||
n += aggr.da.itemsCount()
|
||||
}
|
||||
|
@ -303,11 +349,8 @@ func LoadFromData(data []byte, pushFunc PushFunc, opts Options) (*Aggregators, e
|
|||
})
|
||||
|
||||
metrics.RegisterSet(ms)
|
||||
return &Aggregators{
|
||||
as: as,
|
||||
configData: configData,
|
||||
ms: ms,
|
||||
}, nil
|
||||
a.ms = ms
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsEnabled returns true if Aggregators has at least one configured aggregator
|
||||
|
@ -322,7 +365,7 @@ func (a *Aggregators) IsEnabled() bool {
|
|||
}
|
||||
|
||||
// MustStop stops a.
|
||||
func (a *Aggregators) MustStop() {
|
||||
func (a *Aggregators) MustStop(ignoreAggrConfigs []string) {
|
||||
if a == nil {
|
||||
return
|
||||
}
|
||||
|
@ -331,8 +374,10 @@ func (a *Aggregators) MustStop() {
|
|||
a.ms = nil
|
||||
|
||||
for _, aggr := range a.as {
|
||||
if ignoreAggrConfigs == nil || !slices.Contains(ignoreAggrConfigs, aggr.configData) {
|
||||
aggr.MustStop()
|
||||
}
|
||||
}
|
||||
a.as = nil
|
||||
}
|
||||
|
||||
|
@ -341,7 +386,16 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
|
|||
if a == nil || b == nil {
|
||||
return a == nil && b == nil
|
||||
}
|
||||
return string(a.configData) == string(b.configData)
|
||||
return slices.Compare(a.configData(), b.configData()) == 0
|
||||
}
|
||||
|
||||
func (a *Aggregators) configData() []string {
|
||||
result := make([]string, len(a.as))
|
||||
for i := range result {
|
||||
result[i] = a.as[i].configData
|
||||
}
|
||||
slices.Sort(result)
|
||||
return result
|
||||
}
|
||||
|
||||
// Push pushes tss to a.
|
||||
|
@ -380,6 +434,7 @@ type aggregator struct {
|
|||
|
||||
keepMetricNames bool
|
||||
ignoreOldSamples bool
|
||||
configData string
|
||||
|
||||
by []string
|
||||
without []string
|
||||
|
@ -483,6 +538,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
|||
dropInputLabels := opts.DropInputLabels
|
||||
if v := cfg.DropInputLabels; v != nil {
|
||||
dropInputLabels = *v
|
||||
} else {
|
||||
cfg.DropInputLabels = &dropInputLabels
|
||||
}
|
||||
|
||||
// initialize input_relabel_configs and output_relabel_configs
|
||||
|
@ -510,6 +567,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
|||
keepMetricNames := opts.KeepMetricNames
|
||||
if v := cfg.KeepMetricNames; v != nil {
|
||||
keepMetricNames = *v
|
||||
} else {
|
||||
cfg.KeepMetricNames = &keepMetricNames
|
||||
}
|
||||
if keepMetricNames {
|
||||
if len(cfg.Outputs) != 1 {
|
||||
|
@ -524,12 +583,16 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
|||
ignoreOldSamples := opts.IgnoreOldSamples
|
||||
if v := cfg.IgnoreOldSamples; v != nil {
|
||||
ignoreOldSamples = *v
|
||||
} else {
|
||||
cfg.IgnoreOldSamples = &ignoreOldSamples
|
||||
}
|
||||
|
||||
// check cfg.IgnoreFirstIntervals
|
||||
ignoreFirstIntervals := opts.IgnoreFirstIntervals
|
||||
if v := cfg.IgnoreFirstIntervals; v != nil {
|
||||
ignoreFirstIntervals = *v
|
||||
} else {
|
||||
cfg.IgnoreFirstIntervals = &ignoreFirstIntervals
|
||||
}
|
||||
|
||||
// initialize outputs list
|
||||
|
@ -537,6 +600,8 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
|||
return nil, fmt.Errorf("`outputs` list must contain at least a single entry from the list %s; "+
|
||||
"see https://docs.victoriametrics.com/stream-aggregation/", supportedOutputs)
|
||||
}
|
||||
slices.Sort(cfg.Outputs)
|
||||
cfg.Outputs = slices.Compact(cfg.Outputs)
|
||||
aggrStates := make([]aggrState, len(cfg.Outputs))
|
||||
for i, output := range cfg.Outputs {
|
||||
if strings.HasPrefix(output, "quantiles(") {
|
||||
|
@ -677,25 +742,35 @@ func newAggregator(cfg *Config, pushFunc PushFunc, ms *metrics.Set, opts Options
|
|||
a.tickInterval = dedupInterval.Milliseconds()
|
||||
}
|
||||
|
||||
alignFlushToInterval := !opts.NoAlignFlushToInterval
|
||||
noAlignFlushToInterval := opts.NoAlignFlushToInterval
|
||||
if v := cfg.NoAlignFlushToInterval; v != nil {
|
||||
alignFlushToInterval = !*v
|
||||
noAlignFlushToInterval = *v
|
||||
} else {
|
||||
cfg.NoAlignFlushToInterval = &noAlignFlushToInterval
|
||||
}
|
||||
|
||||
skipIncompleteFlush := !opts.FlushOnShutdown
|
||||
flushOnShutdown := opts.FlushOnShutdown
|
||||
if v := cfg.FlushOnShutdown; v != nil {
|
||||
skipIncompleteFlush = !*v
|
||||
flushOnShutdown = *v
|
||||
} else {
|
||||
cfg.FlushOnShutdown = &flushOnShutdown
|
||||
}
|
||||
|
||||
configData, err := json.Marshal(&cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to marshal config: %w", err)
|
||||
}
|
||||
a.configData = string(configData)
|
||||
|
||||
minTime := time.Now()
|
||||
if skipIncompleteFlush && alignFlushToInterval {
|
||||
if !flushOnShutdown && !noAlignFlushToInterval {
|
||||
minTime = minTime.Truncate(interval).Add(interval)
|
||||
}
|
||||
a.minTimestamp.Store(minTime.UnixMilli())
|
||||
|
||||
a.wg.Add(1)
|
||||
go func() {
|
||||
a.runFlusher(pushFunc, alignFlushToInterval, skipIncompleteFlush, interval, dedupInterval, ignoreFirstIntervals)
|
||||
a.runFlusher(pushFunc, !noAlignFlushToInterval, !flushOnShutdown, interval, dedupInterval, ignoreFirstIntervals)
|
||||
a.wg.Done()
|
||||
}()
|
||||
|
||||
|
|
|
@ -19,12 +19,11 @@ func TestAggregatorsFailure(t *testing.T) {
|
|||
pushFunc := func(_ []prompbmarshal.TimeSeries) {
|
||||
panic(fmt.Errorf("pushFunc shouldn't be called"))
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, Options{})
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
a := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if a != nil {
|
||||
t.Fatalf("expecting nil a")
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,12 +156,17 @@ func TestAggregatorsEqual(t *testing.T) {
|
|||
t.Helper()
|
||||
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
aa, err := LoadFromData([]byte(a), pushFunc, Options{})
|
||||
if err != nil {
|
||||
aa := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := aa.loadAggregatorsFromData([]byte(a)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
ab, err := LoadFromData([]byte(b), pushFunc, Options{})
|
||||
if err != nil {
|
||||
|
||||
ab := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := ab.loadAggregatorsFromData([]byte(b)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
result := aa.Equal(ab)
|
||||
|
@ -220,12 +224,14 @@ func TestAggregatorsSuccess(t *testing.T) {
|
|||
tssOutput = appendClonedTimeseries(tssOutput, tss)
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
opts := Options{
|
||||
a := &Aggregators{
|
||||
opts: Options{
|
||||
FlushOnShutdown: true,
|
||||
NoAlignFlushToInterval: true,
|
||||
},
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, opts)
|
||||
if err != nil {
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
|
@ -233,7 +239,7 @@ func TestAggregatorsSuccess(t *testing.T) {
|
|||
offsetMsecs := time.Now().UnixMilli()
|
||||
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
||||
matchIdxs := a.Push(tssInput, nil)
|
||||
a.MustStop()
|
||||
a.MustStop(nil)
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
|
@ -917,12 +923,14 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|||
}
|
||||
tssOutputLock.Unlock()
|
||||
}
|
||||
opts := Options{
|
||||
a := &Aggregators{
|
||||
opts: Options{
|
||||
DedupInterval: 30 * time.Second,
|
||||
FlushOnShutdown: true,
|
||||
},
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
a, err := LoadFromData([]byte(config), pushFunc, opts)
|
||||
if err != nil {
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
t.Fatalf("cannot initialize aggregators: %s", err)
|
||||
}
|
||||
|
||||
|
@ -930,7 +938,7 @@ func TestAggregatorsWithDedupInterval(t *testing.T) {
|
|||
offsetMsecs := time.Now().UnixMilli()
|
||||
tssInput := prompbmarshal.MustParsePromMetrics(inputMetrics, offsetMsecs)
|
||||
matchIdxs := a.Push(tssInput, nil)
|
||||
a.MustStop()
|
||||
a.MustStop(nil)
|
||||
|
||||
// Verify matchIdxs equals to matchIdxsExpected
|
||||
matchIdxsStr := ""
|
||||
|
|
|
@ -47,7 +47,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
|
|||
}
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
a := newBenchAggregators(outputs, pushFunc)
|
||||
defer a.MustStop()
|
||||
defer a.MustStop(nil)
|
||||
_ = a.Push(benchSeries, nil)
|
||||
|
||||
b.ResetTimer()
|
||||
|
@ -63,7 +63,7 @@ func BenchmarkAggregatorsFlushSerial(b *testing.B) {
|
|||
func benchmarkAggregatorsPush(b *testing.B, output string) {
|
||||
pushFunc := func(_ []prompbmarshal.TimeSeries) {}
|
||||
a := newBenchAggregators([]string{output}, pushFunc)
|
||||
defer a.MustStop()
|
||||
defer a.MustStop(nil)
|
||||
|
||||
const loops = 100
|
||||
|
||||
|
@ -92,8 +92,10 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
|
|||
outputs: [%s]
|
||||
`, strings.Join(outputsQuoted, ","))
|
||||
|
||||
a, err := LoadFromData([]byte(config), pushFunc, Options{})
|
||||
if err != nil {
|
||||
a := &Aggregators{
|
||||
pushFunc: pushFunc,
|
||||
}
|
||||
if err := a.loadAggregatorsFromData([]byte(config)); err != nil {
|
||||
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
|
||||
}
|
||||
return a
|
||||
|
|
Loading…
Reference in a new issue