app/vmagent: reduce memory usage when importing data via /api/v1/import

Previously vmagent could use big amounts of RAM when each ingested JSON line
contained many samples.
This commit is contained in:
Aliaksandr Valialkin 2020-09-26 04:07:45 +03:00
parent 82973f8ae7
commit b6a976b98d
2 changed files with 14 additions and 5 deletions

View file

@ -19,7 +19,7 @@ var (
flushInterval = flag.Duration("remoteWrite.flushInterval", time.Second, "Interval for flushing the data to remote storage. "+
"Higher value reduces network bandwidth usage at the cost of delayed push of scraped data to remote storage. "+
"Minimum supported interval is 1 second")
maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 32*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
maxUnpackedBlockSize = flagutil.NewBytes("remoteWrite.maxBlockSize", 8*1024*1024, "The maximum size in bytes of unpacked request to send to remote storage. "+
"It shouldn't exceed -maxInsertRequestSize from VictoriaMetrics")
)
@ -127,7 +127,7 @@ func (wr *writeRequest) push(src []prompbmarshal.TimeSeries) {
for i := range src {
tssDst = append(tssDst, prompbmarshal.TimeSeries{})
wr.copyTimeSeries(&tssDst[len(tssDst)-1], &src[i])
if len(tssDst) >= maxRowsPerBlock {
if len(wr.samples) >= maxRowsPerBlock {
wr.tss = tssDst
wr.flush()
tssDst = wr.tss

View file

@ -153,10 +153,19 @@ func Push(wr *prompbmarshal.WriteRequest) {
tss := wr.Timeseries
for len(tss) > 0 {
// Process big tss in smaller blocks in order to reduce the maximum memory usage
samplesCount := 0
i := 0
for i < len(tss) {
samplesCount += len(tss[i].Samples)
i++
if samplesCount > maxRowsPerBlock {
break
}
}
tssBlock := tss
if len(tssBlock) > maxRowsPerBlock {
tssBlock = tss[:maxRowsPerBlock]
tss = tss[maxRowsPerBlock:]
if i < len(tss) {
tssBlock = tss[:i]
tss = tss[i:]
} else {
tss = nil
}