2020-02-23 11:35:47 +00:00
package remotewrite
import (
"flag"
"fmt"
2020-05-30 11:36:40 +00:00
"sync"
2020-02-23 11:35:47 +00:00
"sync/atomic"
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-07-21 18:55:24 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
2020-05-30 11:36:40 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
xxhash "github.com/cespare/xxhash/v2"
)
var (
remoteWriteURLs = flagutil . NewArray ( "remoteWrite.url" , "Remote storage URL to write data to. It must support Prometheus remote_write API. " +
"It is recommended using VictoriaMetrics as remote storage. Example url: http://<victoriametrics-host>:8428/api/v1/write . " +
"Pass multiple -remoteWrite.url flags in order to write data concurrently to multiple remote storage systems" )
2020-05-30 11:36:40 +00:00
tmpDataPath = flag . String ( "remoteWrite.tmpDataPath" , "vmagent-remotewrite-data" , "Path to directory where temporary data for remote write component is stored" )
2020-09-18 11:21:45 +00:00
queues = flag . Int ( "remoteWrite.queues" , 4 , "The number of concurrent queues to each -remoteWrite.url. Set more queues if default number of queues " +
2020-02-23 11:35:47 +00:00
"isn't enough for sending high volume of collected data to remote storage" )
showRemoteWriteURL = flag . Bool ( "remoteWrite.showURL" , false , "Whether to show -remoteWrite.url in the exported metrics. " +
2020-07-10 11:07:02 +00:00
"It is hidden by default, since it can contain sensitive info such as auth key" )
2020-08-16 14:05:52 +00:00
maxPendingBytesPerURL = flagutil . NewBytes ( "remoteWrite.maxDiskUsagePerURL" , 0 , "The maximum file-based buffer size in bytes at -remoteWrite.tmpDataPath " +
2020-03-03 17:48:46 +00:00
"for each -remoteWrite.url. When buffer size reaches the configured maximum, then old data is dropped when adding new data to the buffer. " +
"Buffered data is stored in ~500MB chunks, so the minimum practical value for this flag is 500000000. " +
"Disk usage is unlimited if the value is set to 0" )
2020-08-16 14:21:35 +00:00
significantFigures = flag . Int ( "remoteWrite.significantFigures" , 0 , "The number of significant figures to leave in metric values before writing them to remote storage. " +
"See https://en.wikipedia.org/wiki/Significant_figures . Zero value saves all the significant figures. " +
2020-07-21 18:55:24 +00:00
"This option may be used for increasing on-disk compression level for the stored metrics" )
2020-02-23 11:35:47 +00:00
)
2020-03-03 11:08:17 +00:00
var rwctxs [ ] * remoteWriteCtx
2020-05-30 11:36:40 +00:00
// Contains the current relabelConfigs.
var allRelabelConfigs atomic . Value
2020-08-30 18:23:38 +00:00
// maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value,
// since it may lead to high memory usage due to big number of buffers.
2020-12-08 18:49:32 +00:00
var maxQueues = cgroup . AvailableCPUs ( ) * 4
2020-08-30 18:23:38 +00:00
2020-09-29 16:48:53 +00:00
// InitSecretFlags must be called after flag.Parse and before any logging.
func InitSecretFlags ( ) {
if ! * showRemoteWriteURL {
// remoteWrite.url can contain authentication codes, so hide it at `/metrics` output.
flagutil . RegisterSecretFlag ( "remoteWrite.url" )
}
}
2020-02-23 11:35:47 +00:00
// Init initializes remotewrite.
//
// It must be called after flag.Parse().
//
// Stop must be called for graceful shutdown.
func Init ( ) {
if len ( * remoteWriteURLs ) == 0 {
2020-08-30 18:23:38 +00:00
logger . Fatalf ( "at least one `-remoteWrite.url` command-line flag must be set" )
}
if * queues > maxQueues {
* queues = maxQueues
}
if * queues <= 0 {
* queues = 1
2020-02-23 11:35:47 +00:00
}
2020-05-30 11:36:40 +00:00
initLabelsGlobal ( )
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Fatalf ( "cannot load relabel configs: %s" , err )
}
allRelabelConfigs . Store ( rcs )
2020-02-23 11:35:47 +00:00
maxInmemoryBlocks := memory . Allowed ( ) / len ( * remoteWriteURLs ) / maxRowsPerBlock / 100
if maxInmemoryBlocks > 200 {
// There is no much sense in keeping higher number of blocks in memory,
// since this means that the producer outperforms consumer and the queue
// will continue growing. It is better storing the queue to file.
maxInmemoryBlocks = 200
}
if maxInmemoryBlocks < 2 {
maxInmemoryBlocks = 2
}
for i , remoteWriteURL := range * remoteWriteURLs {
2020-09-16 19:34:01 +00:00
sanitizedURL := fmt . Sprintf ( "%d:secret-url" , i + 1 )
2020-02-23 11:35:47 +00:00
if * showRemoteWriteURL {
2020-09-16 19:34:01 +00:00
sanitizedURL = fmt . Sprintf ( "%d:%s" , i + 1 , remoteWriteURL )
2020-02-23 11:35:47 +00:00
}
2020-09-16 19:34:01 +00:00
rwctx := newRemoteWriteCtx ( i , remoteWriteURL , maxInmemoryBlocks , sanitizedURL )
2020-03-03 11:08:17 +00:00
rwctxs = append ( rwctxs , rwctx )
2020-02-23 11:35:47 +00:00
}
2020-05-30 11:36:40 +00:00
// Start config reloader.
sighupCh := procutil . NewSighupChan ( )
configReloaderWG . Add ( 1 )
go func ( ) {
defer configReloaderWG . Done ( )
for {
select {
case <- sighupCh :
case <- stopCh :
return
}
logger . Infof ( "SIGHUP received; reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig" )
rcs , err := loadRelabelConfigs ( )
if err != nil {
logger . Errorf ( "cannot reload relabel configs; preserving the previous configs; error: %s" , err )
continue
}
allRelabelConfigs . Store ( rcs )
logger . Infof ( "Successfully reloaded relabel configs" )
}
} ( )
2020-02-23 11:35:47 +00:00
}
2020-05-30 11:36:40 +00:00
var stopCh = make ( chan struct { } )
var configReloaderWG sync . WaitGroup
2020-02-23 11:35:47 +00:00
// Stop stops remotewrite.
//
// It is expected that nobody calls Push during and after the call to this func.
func Stop ( ) {
2020-05-30 11:36:40 +00:00
close ( stopCh )
configReloaderWG . Wait ( )
2020-03-03 11:08:17 +00:00
for _ , rwctx := range rwctxs {
rwctx . MustStop ( )
2020-02-23 11:35:47 +00:00
}
2020-03-03 11:08:17 +00:00
rwctxs = nil
2020-02-23 11:35:47 +00:00
}
// Push sends wr to remote storage systems set via `-remoteWrite.url`.
//
2020-07-21 18:55:24 +00:00
// Note that wr may be modified by Push due to relabeling and rounding.
2020-02-23 11:35:47 +00:00
func Push ( wr * prompbmarshal . WriteRequest ) {
2020-08-16 14:21:35 +00:00
if * significantFigures > 0 {
// Round values according to significantFigures
2020-07-21 18:55:24 +00:00
for i := range wr . Timeseries {
samples := wr . Timeseries [ i ] . Samples
for j := range samples {
s := & samples [ j ]
2020-08-16 14:21:35 +00:00
s . Value = decimal . Round ( s . Value , * significantFigures )
2020-07-21 18:55:24 +00:00
}
}
}
2020-03-03 11:08:17 +00:00
var rctx * relabelCtx
2020-05-30 11:36:40 +00:00
rcs := allRelabelConfigs . Load ( ) . ( * relabelConfigs )
prcsGlobal := rcs . global
2020-03-06 17:26:17 +00:00
if len ( prcsGlobal ) > 0 || len ( labelsGlobal ) > 0 {
2020-03-03 11:08:17 +00:00
rctx = getRelabelCtx ( )
}
2020-02-28 16:57:45 +00:00
tss := wr . Timeseries
for len ( tss ) > 0 {
2020-07-10 12:13:26 +00:00
// Process big tss in smaller blocks in order to reduce the maximum memory usage
2020-09-26 01:07:45 +00:00
samplesCount := 0
i := 0
for i < len ( tss ) {
samplesCount += len ( tss [ i ] . Samples )
i ++
if samplesCount > maxRowsPerBlock {
break
}
}
2020-02-28 16:57:45 +00:00
tssBlock := tss
2020-09-26 01:07:45 +00:00
if i < len ( tss ) {
tssBlock = tss [ : i ]
tss = tss [ i : ]
2020-02-28 18:03:38 +00:00
} else {
tss = nil
2020-02-28 16:57:45 +00:00
}
2020-03-03 11:08:17 +00:00
if rctx != nil {
tssBlockLen := len ( tssBlock )
tssBlock = rctx . applyRelabeling ( tssBlock , labelsGlobal , prcsGlobal )
globalRelabelMetricsDropped . Add ( tssBlockLen - len ( tssBlock ) )
}
for _ , rwctx := range rwctxs {
rwctx . Push ( tssBlock )
}
2020-03-03 13:00:52 +00:00
if rctx != nil {
rctx . reset ( )
}
2020-02-28 16:57:45 +00:00
}
2020-03-03 11:08:17 +00:00
if rctx != nil {
putRelabelCtx ( rctx )
}
2020-02-23 11:35:47 +00:00
}
2020-03-03 11:08:17 +00:00
var globalRelabelMetricsDropped = metrics . NewCounter ( "vmagent_remotewrite_global_relabel_metrics_dropped_total" )
type remoteWriteCtx struct {
2020-05-30 11:36:40 +00:00
idx int
2020-03-03 11:08:17 +00:00
fq * persistentqueue . FastQueue
c * client
pss [ ] * pendingSeries
pssNextIdx uint64
relabelMetricsDropped * metrics . Counter
}
2020-09-16 19:34:01 +00:00
func newRemoteWriteCtx ( argIdx int , remoteWriteURL string , maxInmemoryBlocks int , sanitizedURL string ) * remoteWriteCtx {
2020-03-03 11:08:17 +00:00
h := xxhash . Sum64 ( [ ] byte ( remoteWriteURL ) )
2020-09-11 12:16:02 +00:00
path := fmt . Sprintf ( "%s/persistent-queue/%d_%016X" , * tmpDataPath , argIdx + 1 , h )
2020-09-16 19:34:01 +00:00
fq := persistentqueue . MustOpenFastQueue ( path , sanitizedURL , maxInmemoryBlocks , maxPendingBytesPerURL . N )
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_data_bytes { path=%q, url=%q} ` , path , sanitizedURL ) , func ( ) float64 {
2020-03-03 11:08:17 +00:00
return float64 ( fq . GetPendingBytes ( ) )
} )
2020-09-16 19:34:01 +00:00
_ = metrics . GetOrCreateGauge ( fmt . Sprintf ( ` vmagent_remotewrite_pending_inmemory_blocks { path=%q, url=%q} ` , path , sanitizedURL ) , func ( ) float64 {
2020-03-03 11:08:17 +00:00
return float64 ( fq . GetInmemoryQueueLen ( ) )
} )
2020-09-16 19:34:01 +00:00
c := newClient ( argIdx , remoteWriteURL , sanitizedURL , fq , * queues )
2020-03-03 11:08:17 +00:00
pss := make ( [ ] * pendingSeries , * queues )
for i := range pss {
pss [ i ] = newPendingSeries ( fq . MustWriteBlock )
}
return & remoteWriteCtx {
2020-05-30 11:36:40 +00:00
idx : argIdx ,
fq : fq ,
c : c ,
pss : pss ,
2020-03-03 11:08:17 +00:00
2020-09-16 19:34:01 +00:00
relabelMetricsDropped : metrics . GetOrCreateCounter ( fmt . Sprintf ( ` vmagent_remotewrite_relabel_metrics_dropped_total { path=%q, url=%q} ` , path , sanitizedURL ) ) ,
2020-02-23 11:35:47 +00:00
}
}
2020-03-03 11:08:17 +00:00
func ( rwctx * remoteWriteCtx ) MustStop ( ) {
for _ , ps := range rwctx . pss {
ps . MustStop ( )
}
2020-05-30 11:36:40 +00:00
rwctx . idx = 0
2020-03-03 11:08:17 +00:00
rwctx . pss = nil
rwctx . fq . MustClose ( )
rwctx . fq = nil
rwctx . c . MustStop ( )
rwctx . c = nil
rwctx . relabelMetricsDropped = nil
}
2020-02-23 11:35:47 +00:00
2020-03-03 11:08:17 +00:00
func ( rwctx * remoteWriteCtx ) Push ( tss [ ] prompbmarshal . TimeSeries ) {
var rctx * relabelCtx
2020-07-10 12:13:26 +00:00
var v * [ ] prompbmarshal . TimeSeries
2020-05-30 11:36:40 +00:00
rcs := allRelabelConfigs . Load ( ) . ( * relabelConfigs )
prcs := rcs . perURL [ rwctx . idx ]
if len ( prcs ) > 0 {
2020-07-10 12:13:26 +00:00
rctx = getRelabelCtx ( )
2020-05-12 19:01:47 +00:00
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
2020-07-10 12:13:26 +00:00
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssRelabelPool . Get ( ) . ( * [ ] prompbmarshal . TimeSeries )
tss = append ( * v , tss ... )
2020-03-03 11:08:17 +00:00
tssLen := len ( tss )
2020-05-30 11:36:40 +00:00
tss = rctx . applyRelabeling ( tss , nil , prcs )
2020-03-03 11:08:17 +00:00
rwctx . relabelMetricsDropped . Add ( tssLen - len ( tss ) )
}
pss := rwctx . pss
idx := atomic . AddUint64 ( & rwctx . pssNextIdx , 1 ) % uint64 ( len ( pss ) )
pss [ idx ] . Push ( tss )
if rctx != nil {
2020-07-10 12:13:26 +00:00
* v = prompbmarshal . ResetTimeSeries ( tss )
tssRelabelPool . Put ( v )
2020-03-03 11:08:17 +00:00
putRelabelCtx ( rctx )
}
}
2020-07-10 12:13:26 +00:00
var tssRelabelPool = & sync . Pool {
New : func ( ) interface { } {
2020-07-14 11:27:50 +00:00
a := [ ] prompbmarshal . TimeSeries { }
return & a
2020-07-10 12:13:26 +00:00
} ,
}