2020-02-23 11:35:47 +00:00
package remotewrite
import (
"flag"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
2020-05-14 19:01:51 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2020-08-16 14:05:52 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
2020-02-23 11:35:47 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
"github.com/VictoriaMetrics/metrics"
"github.com/golang/snappy"
)
2020-02-25 17:57:47 +00:00
var (
flushInterval = flag . Duration ( "remoteWrite.flushInterval" , time . Second , "Interval for flushing the data to remote storage. " +
2020-05-14 19:01:51 +00:00
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. " +
"Minimum supported interval is 1 second" )
2020-08-16 14:05:52 +00:00
maxUnpackedBlockSize = flagutil . NewBytes ( "remoteWrite.maxBlockSize" , 32 * 1024 * 1024 , "The maximum size in bytes of unpacked request to send to remote storage. " +
2020-02-25 17:57:47 +00:00
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics" )
)
2020-02-23 11:35:47 +00:00
// the maximum number of rows to send per each block.
const maxRowsPerBlock = 10000
type pendingSeries struct {
mu sync . Mutex
wr writeRequest
stopCh chan struct { }
periodicFlusherWG sync . WaitGroup
}
func newPendingSeries ( pushBlock func ( block [ ] byte ) ) * pendingSeries {
var ps pendingSeries
ps . wr . pushBlock = pushBlock
ps . stopCh = make ( chan struct { } )
ps . periodicFlusherWG . Add ( 1 )
go func ( ) {
defer ps . periodicFlusherWG . Done ( )
ps . periodicFlusher ( )
} ( )
return & ps
}
func ( ps * pendingSeries ) MustStop ( ) {
close ( ps . stopCh )
ps . periodicFlusherWG . Wait ( )
}
func ( ps * pendingSeries ) Push ( tss [ ] prompbmarshal . TimeSeries ) {
ps . mu . Lock ( )
ps . wr . push ( tss )
ps . mu . Unlock ( )
}
func ( ps * pendingSeries ) periodicFlusher ( ) {
2020-05-14 19:01:51 +00:00
flushSeconds := int64 ( flushInterval . Seconds ( ) )
if flushSeconds <= 0 {
flushSeconds = 1
}
2020-02-23 11:35:47 +00:00
ticker := time . NewTicker ( * flushInterval )
defer ticker . Stop ( )
mustStop := false
for ! mustStop {
select {
case <- ps . stopCh :
mustStop = true
case <- ticker . C :
2020-05-14 19:01:51 +00:00
if fasttime . UnixTimestamp ( ) - ps . wr . lastFlushTime < uint64 ( flushSeconds ) {
2020-02-23 11:35:47 +00:00
continue
}
}
ps . mu . Lock ( )
ps . wr . flush ( )
ps . mu . Unlock ( )
}
}
type writeRequest struct {
wr prompbmarshal . WriteRequest
pushBlock func ( block [ ] byte )
2020-05-14 19:01:51 +00:00
lastFlushTime uint64
2020-02-23 11:35:47 +00:00
tss [ ] prompbmarshal . TimeSeries
labels [ ] prompbmarshal . Label
samples [ ] prompbmarshal . Sample
buf [ ] byte
}
func ( wr * writeRequest ) reset ( ) {
wr . wr . Timeseries = nil
for i := range wr . tss {
ts := & wr . tss [ i ]
ts . Labels = nil
ts . Samples = nil
}
wr . tss = wr . tss [ : 0 ]
for i := range wr . labels {
label := & wr . labels [ i ]
label . Name = ""
label . Value = ""
}
wr . labels = wr . labels [ : 0 ]
wr . samples = wr . samples [ : 0 ]
wr . buf = wr . buf [ : 0 ]
}
func ( wr * writeRequest ) flush ( ) {
wr . wr . Timeseries = wr . tss
2020-05-14 19:01:51 +00:00
wr . lastFlushTime = fasttime . UnixTimestamp ( )
2020-02-23 11:35:47 +00:00
pushWriteRequest ( & wr . wr , wr . pushBlock )
wr . reset ( )
}
func ( wr * writeRequest ) push ( src [ ] prompbmarshal . TimeSeries ) {
tssDst := wr . tss
for i := range src {
tssDst = append ( tssDst , prompbmarshal . TimeSeries { } )
dst := & tssDst [ len ( tssDst ) - 1 ]
wr . copyTimeSeries ( dst , & src [ i ] )
if len ( wr . tss ) >= maxRowsPerBlock {
wr . flush ( )
tssDst = wr . tss
}
}
wr . tss = tssDst
}
func ( wr * writeRequest ) copyTimeSeries ( dst , src * prompbmarshal . TimeSeries ) {
labelsDst := wr . labels
labelsLen := len ( wr . labels )
samplesDst := wr . samples
buf := wr . buf
for i := range src . Labels {
labelsDst = append ( labelsDst , prompbmarshal . Label { } )
dstLabel := & labelsDst [ len ( labelsDst ) - 1 ]
srcLabel := & src . Labels [ i ]
buf = append ( buf , srcLabel . Name ... )
dstLabel . Name = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Name ) : ] )
buf = append ( buf , srcLabel . Value ... )
dstLabel . Value = bytesutil . ToUnsafeString ( buf [ len ( buf ) - len ( srcLabel . Value ) : ] )
}
dst . Labels = labelsDst [ labelsLen : ]
2020-05-15 14:35:59 +00:00
samplesDst = append ( samplesDst , src . Samples ... )
dst . Samples = samplesDst [ len ( samplesDst ) - len ( src . Samples ) : ]
2020-02-23 11:35:47 +00:00
wr . samples = samplesDst
wr . labels = labelsDst
wr . buf = buf
}
func pushWriteRequest ( wr * prompbmarshal . WriteRequest , pushBlock func ( block [ ] byte ) ) {
if len ( wr . Timeseries ) == 0 {
// Nothing to push
return
}
bb := writeRequestBufPool . Get ( )
bb . B = prompbmarshal . MarshalWriteRequest ( bb . B [ : 0 ] , wr )
2020-08-16 14:05:52 +00:00
if len ( bb . B ) <= maxUnpackedBlockSize . N {
2020-02-25 17:34:35 +00:00
zb := snappyBufPool . Get ( )
zb . B = snappy . Encode ( zb . B [ : cap ( zb . B ) ] , bb . B )
writeRequestBufPool . Put ( bb )
2020-02-25 17:57:47 +00:00
if len ( zb . B ) <= persistentqueue . MaxBlockSize {
pushBlock ( zb . B )
blockSizeRows . Update ( float64 ( len ( wr . Timeseries ) ) )
blockSizeBytes . Update ( float64 ( len ( zb . B ) ) )
snappyBufPool . Put ( zb )
return
}
2020-02-23 11:35:47 +00:00
snappyBufPool . Put ( zb )
2020-02-25 17:57:47 +00:00
} else {
writeRequestBufPool . Put ( bb )
2020-02-23 11:35:47 +00:00
}
// Too big block. Recursively split it into smaller parts.
timeseries := wr . Timeseries
n := len ( timeseries ) / 2
wr . Timeseries = timeseries [ : n ]
pushWriteRequest ( wr , pushBlock )
wr . Timeseries = timeseries [ n : ]
pushWriteRequest ( wr , pushBlock )
wr . Timeseries = timeseries
}
var (
blockSizeBytes = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_bytes ` )
blockSizeRows = metrics . NewHistogram ( ` vmagent_remotewrite_block_size_rows ` )
)
var writeRequestBufPool bytesutil . ByteBufferPool
var snappyBufPool bytesutil . ByteBufferPool