2020-02-23 11:35:47 +00:00
package remotewrite
import (
"flag"
"sync"
2020-09-03 09:10:47 +00:00
"sync/atomic"
2020-02-23 11:35:47 +00:00
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2021-02-01 12:27:05 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
2023-02-21 02:38:49 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding/zstd"
2020-05-14 19:01:51 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-08-16 14:05:52 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2022-03-18 17:06:18 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
2020-11-07 14:16:56 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
2024-01-22 16:12:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
2020-02-25 17:57:47 +00:00
var (
flushInterval = flag . Duration ( "remoteWrite.flushInterval" , time . Second , "Interval for flushing the data to remote storage. " +
2021-03-01 09:50:39 +00:00
"This option takes effect only when less than 10K data points per second are pushed to -remoteWrite.url" )
2021-11-04 13:39:14 +00:00
maxUnpackedBlockSize = flagutil . NewBytes ( "remoteWrite.maxBlockSize" , 8 * 1024 * 1024 , "The maximum block size to send to remote storage. Bigger blocks may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxRowsPerBlock" )
maxRowsPerBlock = flag . Int ( "remoteWrite.maxRowsPerBlock" , 10000 , "The maximum number of samples to send in each block to remote storage. Higher number may improve performance at the cost of the increased memory usage. See also -remoteWrite.maxBlockSize" )
2023-02-27 19:03:49 +00:00
vmProtoCompressLevel = flag . Int ( "remoteWrite.vmProtoCompressLevel" , 0 , "The compression level for VictoriaMetrics remote write protocol. " +
"Higher values reduce network traffic at the cost of higher CPU usage. Negative values reduce CPU usage at the cost of increased network traffic. " +
"See https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol" )
2020-02-25 17:57:47 +00:00
)
2020-02-23 11:35:47 +00:00
type pendingSeries struct {
mu sync . Mutex
wr writeRequest
stopCh chan struct { }
periodicFlusherWG sync . WaitGroup
}
2023-11-24 12:42:11 +00:00
func newPendingSeries ( fq * persistentqueue . FastQueue , isVMRemoteWrite bool , significantFigures , roundDigits int ) * pendingSeries {
2020-02-23 11:35:47 +00:00
var ps pendingSeries
2023-11-24 12:42:11 +00:00
ps . wr . fq = fq
2023-02-21 02:38:49 +00:00
ps . wr . isVMRemoteWrite = isVMRemoteWrite
2021-02-01 12:27:05 +00:00
ps . wr . significantFigures = significantFigures
ps . wr . roundDigits = roundDigits
2020-02-23 11:35:47 +00:00
ps . stopCh = make ( chan struct { } )
ps . periodicFlusherWG . Add ( 1 )
go func ( ) {
defer ps . periodicFlusherWG . Done ( )
ps . periodicFlusher ( )
} ( )
return & ps
}
func ( ps * pendingSeries ) MustStop ( ) {
close ( ps . stopCh )
ps . periodicFlusherWG . Wait ( )
}
2023-11-25 09:31:30 +00:00
func ( ps * pendingSeries ) TryPush ( tss [ ] prompbmarshal . TimeSeries ) bool {
2020-02-23 11:35:47 +00:00
ps . mu . Lock ( )
2023-11-25 09:31:30 +00:00
ok := ps . wr . tryPush ( tss )
2020-02-23 11:35:47 +00:00
ps . mu . Unlock ( )
2023-11-25 09:31:30 +00:00
return ok
2020-02-23 11:35:47 +00:00
}
func ( ps * pendingSeries ) periodicFlusher ( ) {
2020-05-14 19:01:51 +00:00
flushSeconds := int64 ( flushInterval . Seconds ( ) )
if flushSeconds <= 0 {
flushSeconds = 1
}
2024-01-22 16:12:37 +00:00
d := timeutil . AddJitterToDuration ( * flushInterval )
ticker := time . NewTicker ( d )
2020-02-23 11:35:47 +00:00
defer ticker . Stop ( )
2023-11-24 12:42:11 +00:00
for {
2020-02-23 11:35:47 +00:00
select {
case <- ps . stopCh :
2023-11-24 12:42:11 +00:00
ps . mu . Lock ( )
ps . wr . mustFlushOnStop ( )
ps . mu . Unlock ( )
return
2020-02-23 11:35:47 +00:00
case <- ticker . C :
2020-09-03 09:10:47 +00:00
if fasttime . UnixTimestamp ( ) - atomic . LoadUint64 ( & ps . wr . lastFlushTime ) < uint64 ( flushSeconds ) {
2020-02-23 11:35:47 +00:00
continue
}
}
ps . mu . Lock ( )
2023-11-25 09:31:30 +00:00
_ = ps . wr . tryFlush ( )
2020-02-23 11:35:47 +00:00
ps . mu . Unlock ( )
}
}
type writeRequest struct {
2020-09-03 09:10:47 +00:00
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
2020-05-14 19:01:51 +00:00
lastFlushTime uint64
2020-02-23 11:35:47 +00:00
2023-11-25 09:31:30 +00:00
// The queue to send blocks to.
2023-11-24 12:42:11 +00:00
fq * persistentqueue . FastQueue
2020-09-03 09:10:47 +00:00
2023-02-21 02:38:49 +00:00
// Whether to encode the write request with VictoriaMetrics remote write protocol.
isVMRemoteWrite bool
2023-11-25 09:31:30 +00:00
// How many significant figures must be left before sending the writeRequest to fq.
2021-02-01 12:27:05 +00:00
significantFigures int
2023-11-25 09:31:30 +00:00
// How many decimal digits after point must be left before sending the writeRequest to fq.
2021-02-01 12:27:05 +00:00
roundDigits int
wr prompbmarshal . WriteRequest
2020-02-23 11:35:47 +00:00
tss [ ] prompbmarshal . TimeSeries
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
buf [ ] byte
}
func ( wr * writeRequest ) reset ( ) {
2023-11-25 09:31:30 +00:00
// Do not reset lastFlushTime, fq, isVMRemoteWrite, significantFigures and roundDigits, since they are re-used.
2021-02-01 12:27:05 +00:00
2020-02-23 11:35:47 +00:00
wr . wr . Timeseries = nil
for i := range wr . tss {
ts := & wr . tss [ i ]
ts . Labels = nil
ts . Samples = nil
}
wr . tss = wr . tss [ : 0 ]
2020-11-07 14:16:56 +00:00
promrelabel . CleanLabels ( wr . labels )
2020-02-23 11:35:47 +00:00
wr . labels = wr . labels [ : 0 ]
wr . samples = wr . samples [ : 0 ]
wr . buf = wr . buf [ : 0 ]
}
2023-11-25 09:31:30 +00:00
// mustFlushOnStop force pushes wr data into wr.fq
//
// This is needed in order to properly save in-memory data to persistent queue on graceful shutdown.
2023-11-24 12:42:11 +00:00
func ( wr * writeRequest ) mustFlushOnStop ( ) {
2020-02-23 11:35:47 +00:00
wr . wr . Timeseries = wr . tss
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( & wr . wr , wr . mustWriteBlock , wr . isVMRemoteWrite ) {
logger . Panicf ( "BUG: final flush must always return true" )
2023-11-24 12:42:11 +00:00
}
2020-02-23 11:35:47 +00:00
wr . reset ( )
}
2023-11-25 09:31:30 +00:00
func ( wr * writeRequest ) mustWriteBlock ( block [ ] byte ) bool {
wr . fq . MustWriteBlockIgnoreDisabledPQ ( block )
return true
}
func ( wr * writeRequest ) tryFlush ( ) bool {
2023-11-24 12:42:11 +00:00
wr . wr . Timeseries = wr . tss
atomic . StoreUint64 ( & wr . lastFlushTime , fasttime . UnixTimestamp ( ) )
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( & wr . wr , wr . fq . TryWriteBlock , wr . isVMRemoteWrite ) {
2023-11-24 12:42:11 +00:00
return false
}
wr . reset ( )
return true
}
2023-11-25 09:31:30 +00:00
func adjustSampleValues ( samples [ ] prompbmarshal . Sample , significantFigures , roundDigits int ) {
if n := significantFigures ; n > 0 {
2021-02-01 12:27:05 +00:00
for i := range samples {
s := & samples [ i ]
s . Value = decimal . RoundToSignificantFigures ( s . Value , n )
}
}
2023-11-25 09:31:30 +00:00
if n := roundDigits ; n < 100 {
2021-02-01 12:27:05 +00:00
for i := range samples {
s := & samples [ i ]
s . Value = decimal . RoundToDecimalDigits ( s . Value , n )
}
}
}
2023-11-25 09:31:30 +00:00
func ( wr * writeRequest ) tryPush ( src [ ] prompbmarshal . TimeSeries ) bool {
2020-02-23 11:35:47 +00:00
tssDst := wr . tss
2021-11-04 13:39:14 +00:00
maxSamplesPerBlock := * maxRowsPerBlock
// Allow up to 10x of labels per each block on average.
maxLabelsPerBlock := 10 * maxSamplesPerBlock
2020-02-23 11:35:47 +00:00
for i := range src {
2021-11-04 13:39:14 +00:00
if len ( wr . samples ) >= maxSamplesPerBlock || len ( wr . labels ) >= maxLabelsPerBlock {
2020-09-03 09:08:14 +00:00
wr . tss = tssDst
2023-11-25 09:31:30 +00:00
if ! wr . tryFlush ( ) {
2023-11-24 12:42:11 +00:00
return false
}
2020-02-23 11:35:47 +00:00
tssDst = wr . tss
}
2023-11-25 09:31:30 +00:00
tsSrc := & src [ i ]
adjustSampleValues ( tsSrc . Samples , wr . significantFigures , wr . roundDigits )
2023-11-24 12:42:11 +00:00
tssDst = append ( tssDst , prompbmarshal . TimeSeries { } )
2023-11-25 09:31:30 +00:00
wr . copyTimeSeries ( & tssDst [ len ( tssDst ) - 1 ] , tsSrc )
2020-02-23 11:35:47 +00:00
}
2023-11-24 12:42:11 +00:00
2020-02-23 11:35:47 +00:00
wr . tss = tssDst
2023-11-24 12:42:11 +00:00
return true
2020-02-23 11:35:47 +00:00
}
func ( wr * writeRequest ) copyTimeSeries ( dst , src * prompbmarshal . TimeSeries ) {
labelsDst := wr . labels
labelsLen := len ( wr . labels )
samplesDst := wr . samples
buf := wr . buf
for i := range src . Labels {
labelsDst = append ( labelsDst , prompbmarshal . Label { } )
dstLabel := & labelsDst [ len ( labelsDst ) - 1 ]
srcLabel := & src . Labels [ i ]
buf = append ( buf , srcLabel . Name ... )
dstLabel . Name = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Name ) : ] )
buf = append ( buf , srcLabel . Value ... )
dstLabel . Value = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Value ) : ] )
}
dst . Labels = labelsDst [ labelsLen : ]
2020-05-15 14:35:59 +00:00
samplesDst = append ( samplesDst , src . Samples ... )
dst . Samples = samplesDst [ len ( samplesDst ) - len ( src . Samples ) : ]
2020-02-23 11:35:47 +00:00
wr . samples = samplesDst
wr . labels = labelsDst
wr . buf = buf
}
2023-11-25 09:31:30 +00:00
func tryPushWriteRequest ( wr * prompbmarshal . WriteRequest , tryPushBlock func ( block [ ] byte ) bool , isVMRemoteWrite bool ) bool {
2020-02-23 11:35:47 +00:00
if len ( wr . Timeseries ) == 0 {
// Nothing to push
2023-11-24 12:42:11 +00:00
return true
2020-02-23 11:35:47 +00:00
}
bb := writeRequestBufPool . Get ( )
2024-01-14 21:04:45 +00:00
bb . B = wr . MarshalProtobuf ( bb . B [ : 0 ] )
2022-12-15 03:26:24 +00:00
if len ( bb . B ) <= maxUnpackedBlockSize . IntN ( ) {
2020-02-25 17:34:35 +00:00
zb := snappyBufPool . Get ( )
2023-02-21 02:38:49 +00:00
if isVMRemoteWrite {
2023-02-27 19:03:49 +00:00
zb . B = zstd . CompressLevel ( zb . B [ : 0 ] , bb . B , * vmProtoCompressLevel )
2023-02-21 02:38:49 +00:00
} else {
zb . B = snappy . Encode ( zb . B [ : cap ( zb . B ) ] , bb . B )
}
2020-02-25 17:34:35 +00:00
writeRequestBufPool . Put ( bb )
2020-02-25 17:57:47 +00:00
if len ( zb . B ) <= persistentqueue . MaxBlockSize {
2023-11-25 09:31:30 +00:00
if ! tryPushBlock ( zb . B ) {
2023-11-24 12:42:11 +00:00
return false
}
2020-02-25 17:57:47 +00:00
blockSizeRows . Update ( float64 ( len ( wr . Timeseries ) ) )
blockSizeBytes . Update ( float64 ( len ( zb . B ) ) )
snappyBufPool . Put ( zb )
2023-11-24 12:42:11 +00:00
return true
2020-02-25 17:57:47 +00:00
}
2020-02-23 11:35:47 +00:00
snappyBufPool . Put ( zb )
2020-02-25 17:57:47 +00:00
} else {
writeRequestBufPool . Put ( bb )
2020-02-23 11:35:47 +00:00
}
2022-03-18 17:06:18 +00:00
// Too big block. Recursively split it into smaller parts if possible.
if len ( wr . Timeseries ) == 1 {
// A single time series left. Recursively split its samples into smaller parts if possible.
samples := wr . Timeseries [ 0 ] . Samples
if len ( samples ) == 1 {
logger . Warnf ( "dropping a sample for metric with too long labels exceeding -remoteWrite.maxBlockSize=%d bytes" , maxUnpackedBlockSize . N )
2023-11-24 12:42:11 +00:00
return true
2022-03-18 17:06:18 +00:00
}
n := len ( samples ) / 2
wr . Timeseries [ 0 ] . Samples = samples [ : n ]
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( wr , tryPushBlock , isVMRemoteWrite ) {
wr . Timeseries [ 0 ] . Samples = samples
2023-11-24 12:42:11 +00:00
return false
}
2022-03-18 17:06:18 +00:00
wr . Timeseries [ 0 ] . Samples = samples [ n : ]
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( wr , tryPushBlock , isVMRemoteWrite ) {
wr . Timeseries [ 0 ] . Samples = samples
2023-11-24 12:42:11 +00:00
return false
}
2022-03-18 17:06:18 +00:00
wr . Timeseries [ 0 ] . Samples = samples
2023-11-24 12:42:11 +00:00
return true
2022-03-18 17:06:18 +00:00
}
2020-02-23 11:35:47 +00:00
timeseries := wr . Timeseries
n := len ( timeseries ) / 2
wr . Timeseries = timeseries [ : n ]
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( wr , tryPushBlock , isVMRemoteWrite ) {
wr . Timeseries = timeseries
2023-11-24 12:42:11 +00:00
return false
}
2020-02-23 11:35:47 +00:00
wr . Timeseries = timeseries [ n : ]
2023-11-25 09:31:30 +00:00
if ! tryPushWriteRequest ( wr , tryPushBlock , isVMRemoteWrite ) {
wr . Timeseries = timeseries
2023-11-24 12:42:11 +00:00
return false
}
2020-02-23 11:35:47 +00:00
wr . Timeseries = timeseries
2023-11-24 12:42:11 +00:00
return true
2020-02-23 11:35:47 +00:00
}
var (
blockSizeBytes = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_bytes ` )
blockSizeRows = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_rows ` )
)
var writeRequestBufPool bytesutil . ByteBufferPool
var snappyBufPool bytesutil . ByteBufferPool