2023-01-04 06:19:18 +00:00
package common
import (
"flag"
2023-03-29 16:05:58 +00:00
"fmt"
"sync"
2023-04-01 04:27:45 +00:00
"sync/atomic"
2023-01-04 06:19:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2024-03-05 00:13:21 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2023-01-04 06:19:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2023-03-29 16:05:58 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2023-01-04 06:19:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
2023-03-29 16:05:58 +00:00
"github.com/VictoriaMetrics/metrics"
2023-01-04 06:19:18 +00:00
)
var (
streamAggrConfig = flag . String ( "streamAggr.config" , "" , "Optional path to file with stream aggregation config. " +
2023-01-04 06:40:00 +00:00
"See https://docs.victoriametrics.com/stream-aggregation.html . " +
2023-07-24 23:44:09 +00:00
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval" )
streamAggrKeepInput = flag . Bool ( "streamAggr.keepInput" , false , "Whether to keep all the input samples after the aggregation with -streamAggr.config. " +
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. " +
"See also -streamAggr.dropInput and https://docs.victoriametrics.com/stream-aggregation.html" )
streamAggrDropInput = flag . Bool ( "streamAggr.dropInput" , false , "Whether to drop all the input samples after the aggregation with -streamAggr.config. " +
"By default, only aggregated samples are dropped, while the remaining samples are stored in the database. " +
"See also -streamAggr.keepInput and https://docs.victoriametrics.com/stream-aggregation.html" )
2024-03-04 22:45:22 +00:00
streamAggrDedupInterval = flag . Duration ( "streamAggr.dedupInterval" , 0 , "Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . " +
2024-03-05 00:13:21 +00:00
"See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/stream-aggregation.html#deduplication" )
streamAggrDropInputLabels = flagutil . NewArrayString ( "streamAggr.dropInputLabels" , "An optional list of labels to drop from samples " +
"before stream de-duplication and aggregation . See https://docs.victoriametrics.com/stream-aggregation.html#dropping-unneeded-labels" )
2024-03-17 21:01:44 +00:00
streamAggrIgnoreOldSamples = flag . Bool ( "streamAggr.ignoreOldSamples" , false , "Whether to ignore input samples with old timestamps outside the current aggregation interval. " +
"See https://docs.victoriametrics.com/stream-aggregation.html#ignoring-old-samples" )
2023-01-04 06:19:18 +00:00
)
2023-03-29 16:05:58 +00:00
var (
2023-10-13 11:54:33 +00:00
saCfgReloaderStopCh chan struct { }
2023-04-01 04:27:45 +00:00
saCfgReloaderWG sync . WaitGroup
2023-03-29 16:05:58 +00:00
saCfgReloads = metrics . NewCounter ( ` vminsert_streamagg_config_reloads_total ` )
saCfgReloadErr = metrics . NewCounter ( ` vminsert_streamagg_config_reloads_errors_total ` )
2023-12-20 12:23:38 +00:00
saCfgSuccess = metrics . NewGauge ( ` vminsert_streamagg_config_last_reload_successful ` , nil )
2023-03-29 16:05:58 +00:00
saCfgTimestamp = metrics . NewCounter ( ` vminsert_streamagg_config_last_reload_success_timestamp_seconds ` )
2024-03-04 22:45:22 +00:00
sasGlobal atomic . Pointer [ streamaggr . Aggregators ]
deduplicator * streamaggr . Deduplicator
2023-03-29 16:05:58 +00:00
)
2023-04-01 04:27:45 +00:00
// CheckStreamAggrConfig checks config pointed by -stramaggr.config
func CheckStreamAggrConfig ( ) error {
if * streamAggrConfig == "" {
return nil
}
2024-04-02 20:16:24 +00:00
pushNoop := func ( _ [ ] prompbmarshal . TimeSeries ) { }
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-03-17 21:01:44 +00:00
DedupInterval : * streamAggrDedupInterval ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : * streamAggrIgnoreOldSamples ,
2024-03-04 03:42:55 +00:00
}
sas , err := streamaggr . LoadFromFile ( * streamAggrConfig , pushNoop , opts )
2023-04-01 04:27:45 +00:00
if err != nil {
return fmt . Errorf ( "error when loading -streamAggr.config=%q: %w" , * streamAggrConfig , err )
}
sas . MustStop ( )
return nil
}
2023-01-04 06:19:18 +00:00
// InitStreamAggr must be called after flag.Parse and before using the common package.
//
// MustStopStreamAggr must be called when stream aggr is no longer needed.
func InitStreamAggr ( ) {
2023-10-13 11:54:33 +00:00
saCfgReloaderStopCh = make ( chan struct { } )
2023-01-04 06:19:18 +00:00
if * streamAggrConfig == "" {
2024-03-04 22:45:22 +00:00
if * streamAggrDedupInterval > 0 {
2024-03-05 00:13:21 +00:00
deduplicator = streamaggr . NewDeduplicator ( pushAggregateSeries , * streamAggrDedupInterval , * streamAggrDropInputLabels )
2024-03-04 22:45:22 +00:00
}
2023-01-04 06:19:18 +00:00
return
}
2023-03-29 16:05:58 +00:00
sighupCh := procutil . NewSighupChan ( )
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-03-17 21:01:44 +00:00
DedupInterval : * streamAggrDedupInterval ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : * streamAggrIgnoreOldSamples ,
2024-03-04 03:42:55 +00:00
}
sas , err := streamaggr . LoadFromFile ( * streamAggrConfig , pushAggregateSeries , opts )
2023-01-04 06:19:18 +00:00
if err != nil {
logger . Fatalf ( "cannot load -streamAggr.config=%q: %s" , * streamAggrConfig , err )
}
2024-03-04 22:45:22 +00:00
2023-04-01 04:27:45 +00:00
sasGlobal . Store ( sas )
2023-03-29 16:05:58 +00:00
saCfgSuccess . Set ( 1 )
saCfgTimestamp . Set ( fasttime . UnixTimestamp ( ) )
// Start config reloader.
2023-04-01 04:27:45 +00:00
saCfgReloaderWG . Add ( 1 )
2023-03-29 16:05:58 +00:00
go func ( ) {
2023-04-01 04:27:45 +00:00
defer saCfgReloaderWG . Done ( )
2023-03-29 16:05:58 +00:00
for {
select {
case <- sighupCh :
2023-04-01 04:27:45 +00:00
case <- saCfgReloaderStopCh :
2023-03-29 16:05:58 +00:00
return
}
2023-04-01 04:27:45 +00:00
reloadStreamAggrConfig ( )
2023-03-29 16:05:58 +00:00
}
} ( )
2023-01-04 06:19:18 +00:00
}
2023-04-01 04:27:45 +00:00
func reloadStreamAggrConfig ( ) {
logger . Infof ( "reloading -streamAggr.config=%q" , * streamAggrConfig )
saCfgReloads . Inc ( )
2024-03-04 03:42:55 +00:00
opts := & streamaggr . Options {
2024-03-17 21:01:44 +00:00
DedupInterval : * streamAggrDedupInterval ,
DropInputLabels : * streamAggrDropInputLabels ,
IgnoreOldSamples : * streamAggrIgnoreOldSamples ,
2024-03-04 03:42:55 +00:00
}
sasNew , err := streamaggr . LoadFromFile ( * streamAggrConfig , pushAggregateSeries , opts )
2023-04-01 04:27:45 +00:00
if err != nil {
saCfgSuccess . Set ( 0 )
saCfgReloadErr . Inc ( )
logger . Errorf ( "cannot reload -streamAggr.config=%q: use the previously loaded config; error: %s" , * streamAggrConfig , err )
return
}
sas := sasGlobal . Load ( )
if ! sasNew . Equal ( sas ) {
sasOld := sasGlobal . Swap ( sasNew )
sasOld . MustStop ( )
logger . Infof ( "successfully reloaded stream aggregation config at -streamAggr.config=%q" , * streamAggrConfig )
} else {
logger . Infof ( "nothing changed in -streamAggr.config=%q" , * streamAggrConfig )
sasNew . MustStop ( )
}
saCfgSuccess . Set ( 1 )
saCfgTimestamp . Set ( fasttime . UnixTimestamp ( ) )
}
2023-01-04 06:19:18 +00:00
// MustStopStreamAggr stops stream aggregators.
func MustStopStreamAggr ( ) {
2023-04-01 04:27:45 +00:00
close ( saCfgReloaderStopCh )
saCfgReloaderWG . Wait ( )
2023-03-29 16:05:58 +00:00
2023-04-01 04:27:45 +00:00
sas := sasGlobal . Swap ( nil )
sas . MustStop ( )
2024-03-04 22:45:22 +00:00
if deduplicator != nil {
deduplicator . MustStop ( )
deduplicator = nil
}
2023-03-29 16:05:58 +00:00
}
2023-01-04 06:19:18 +00:00
type streamAggrCtx struct {
2024-03-01 19:29:09 +00:00
mn storage . MetricName
tss [ ] prompbmarshal . TimeSeries
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
buf [ ] byte
2023-01-04 06:19:18 +00:00
}
func ( ctx * streamAggrCtx ) Reset ( ) {
ctx . mn . Reset ( )
2024-03-01 19:29:09 +00:00
clear ( ctx . tss )
ctx . tss = ctx . tss [ : 0 ]
clear ( ctx . labels )
ctx . labels = ctx . labels [ : 0 ]
ctx . samples = ctx . samples [ : 0 ]
ctx . buf = ctx . buf [ : 0 ]
2023-01-04 06:19:18 +00:00
}
2023-07-24 23:44:09 +00:00
func ( ctx * streamAggrCtx ) push ( mrs [ ] storage . MetricRow , matchIdxs [ ] byte ) [ ] byte {
2023-01-04 06:19:18 +00:00
mn := & ctx . mn
2024-03-01 19:29:09 +00:00
tss := ctx . tss
labels := ctx . labels
samples := ctx . samples
buf := ctx . buf
tssLen := len ( tss )
for _ , mr := range mrs {
2023-01-04 06:19:18 +00:00
if err := mn . UnmarshalRaw ( mr . MetricNameRaw ) ; err != nil {
logger . Panicf ( "BUG: cannot unmarshal recently marshaled MetricName: %s" , err )
}
2024-03-01 19:29:09 +00:00
labelsLen := len ( labels )
bufLen := len ( buf )
buf = append ( buf , mn . MetricGroup ... )
metricGroup := bytesutil . ToUnsafeString ( buf [ bufLen : ] )
labels = append ( labels , prompbmarshal . Label {
2023-01-04 06:19:18 +00:00
Name : "__name__" ,
2024-03-01 19:29:09 +00:00
Value : metricGroup ,
2023-01-04 06:19:18 +00:00
} )
2024-03-01 19:29:09 +00:00
2023-01-04 06:19:18 +00:00
for _ , tag := range mn . Tags {
2024-03-01 19:29:09 +00:00
bufLen = len ( buf )
buf = append ( buf , tag . Key ... )
name := bytesutil . ToUnsafeString ( buf [ bufLen : ] )
bufLen = len ( buf )
buf = append ( buf , tag . Value ... )
value := bytesutil . ToUnsafeString ( buf [ bufLen : ] )
2023-01-04 06:19:18 +00:00
labels = append ( labels , prompbmarshal . Label {
2024-03-01 19:29:09 +00:00
Name : name ,
Value : value ,
2023-01-04 06:19:18 +00:00
} )
}
2024-03-01 19:29:09 +00:00
samplesLen := len ( samples )
samples = append ( samples , prompbmarshal . Sample {
2023-01-04 06:19:18 +00:00
Timestamp : mr . Timestamp ,
Value : mr . Value ,
} )
2024-03-01 19:29:09 +00:00
tss = append ( tss , prompbmarshal . TimeSeries {
Labels : labels [ labelsLen : ] ,
Samples : samples [ samplesLen : ] ,
} )
}
ctx . tss = tss
ctx . labels = labels
ctx . samples = samples
ctx . buf = buf
2023-01-04 06:19:18 +00:00
2024-03-01 19:29:09 +00:00
tss = tss [ tssLen : ]
2024-03-04 22:45:22 +00:00
2024-03-01 19:29:09 +00:00
sas := sasGlobal . Load ( )
2024-03-04 22:45:22 +00:00
if sas != nil {
matchIdxs = sas . Push ( tss , matchIdxs )
} else if deduplicator != nil {
matchIdxs = bytesutil . ResizeNoCopyMayOverallocate ( matchIdxs , len ( tss ) )
for i := range matchIdxs {
matchIdxs [ i ] = 1
}
deduplicator . Push ( tss )
}
2024-03-01 19:29:09 +00:00
ctx . Reset ( )
2023-07-24 23:44:09 +00:00
return matchIdxs
2023-01-04 06:19:18 +00:00
}
func pushAggregateSeries ( tss [ ] prompbmarshal . TimeSeries ) {
currentTimestamp := int64 ( fasttime . UnixTimestamp ( ) ) * 1000
var ctx InsertCtx
ctx . Reset ( len ( tss ) )
ctx . skipStreamAggr = true
for _ , ts := range tss {
labels := ts . Labels
2023-05-09 15:33:58 +00:00
ctx . Labels = ctx . Labels [ : 0 ]
2023-01-04 06:19:18 +00:00
for _ , label := range labels {
name := label . Name
if name == "__name__" {
name = ""
}
ctx . AddLabel ( name , label . Value )
}
value := ts . Samples [ 0 ] . Value
if err := ctx . WriteDataPoint ( nil , ctx . Labels , currentTimestamp , value ) ; err != nil {
logger . Errorf ( "cannot store aggregate series: %s" , err )
// Do not continue pushing the remaining samples, since it is likely they will return the same error.
return
}
}
2023-01-07 06:40:07 +00:00
// There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here,
// since the number of concurrent pushAggregateSeries() calls should be already limited by lib/streamaggr.
2023-01-04 06:19:18 +00:00
if err := vmstorage . AddRows ( ctx . mrs ) ; err != nil {
logger . Errorf ( "cannot flush aggregate series: %s" , err )
}
}