lib/promscrape: store the last scraped response in compressed form if its size exceeds -promscrape.minResponseSizeForStreamParse

This should reduce memory usage when scraping targets with big response bodies.
This commit is contained in:
Aliaksandr Valialkin 2021-10-16 12:58:34 +03:00
parent 9866dd95c1
commit 32793adbd9
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"math" "math"
"math/bits" "math/bits"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -14,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -209,6 +209,46 @@ type scrapeWork struct {
// lastScrape holds the last response from scrape target. // lastScrape holds the last response from scrape target.
lastScrape []byte lastScrape []byte
// lastScrapeCompressed is used for storing the compressed lastScrape between scrapes
// in stream parsing mode in order to reduce memory usage when the lastScrape size
// equals to or exceeds -promscrape.minResponseSizeForStreamParse
lastScrapeCompressed []byte
}
func (sw *scrapeWork) loadLastScrape() {
if len(sw.lastScrapeCompressed) == 0 {
// The lastScrape is already stored in sw.lastScrape
return
}
b, err := encoding.DecompressZSTD(sw.lastScrape[:0], sw.lastScrapeCompressed)
if err != nil {
logger.Panicf("BUG: cannot unpack compressed previous response: %s", err)
}
sw.lastScrape = b
}
func (sw *scrapeWork) storeLastScrape(lastScrape []byte) {
mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.N
if mustCompress {
sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1)
sw.lastScrape = nil
} else {
sw.lastScrape = append(sw.lastScrape[:0], lastScrape...)
sw.lastScrapeCompressed = nil
}
}
func (sw *scrapeWork) finalizeLastScrape() {
if len(sw.lastScrapeCompressed) > 0 {
// The compressed lastScrape is available in sw.lastScrapeCompressed.
// Release the memory occupied by sw.lastScrape, so it won't be occupied between scrapes.
sw.lastScrape = nil
}
if len(sw.lastScrape) > 0 {
// Release the memory occupied by sw.lastScrapeCompressed, so it won't be occupied between scrapes.
sw.lastScrapeCompressed = nil
}
} }
func (sw *scrapeWork) run(stopCh <-chan struct{}) { func (sw *scrapeWork) run(stopCh <-chan struct{}) {
@ -307,7 +347,7 @@ func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool {
} }
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
if sw.mustSwitchToStreamParseMode(sw.prevBodyLen) || *streamParse || sw.Config.StreamParse { if *streamParse || sw.Config.StreamParse || sw.mustSwitchToStreamParseMode(sw.prevBodyLen) {
// Read data from scrape targets in streaming manner. // Read data from scrape targets in streaming manner.
// This case is optimized for targets exposing more than ten thousand of metrics per target. // This case is optimized for targets exposing more than ten thousand of metrics per target.
return sw.scrapeStream(scrapeTimestamp, realTimestamp) return sw.scrapeStream(scrapeTimestamp, realTimestamp)
@ -325,6 +365,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeResponseSize.Update(float64(len(body.B))) scrapeResponseSize.Update(float64(len(body.B)))
up := 1 up := 1
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
sw.loadLastScrape()
bodyString := bytesutil.ToUnsafeString(body.B) bodyString := bytesutil.ToUnsafeString(body.B)
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString)
@ -371,10 +412,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = len(body.B) sw.prevBodyLen = len(bodyString)
wc.reset() wc.reset()
canReturnToPool := !sw.mustSwitchToStreamParseMode(len(body.B)) mustSwitchToStreamParse := sw.mustSwitchToStreamParseMode(len(bodyString))
if canReturnToPool { if !mustSwitchToStreamParse {
// Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse // Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets with big responses. // This should reduce memory usage when scraping targets with big responses.
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
@ -382,9 +423,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// body must be released only after wc is released, since wc refers to body. // body must be released only after wc is released, since wc refers to body.
if !areIdenticalSeries { if !areIdenticalSeries {
sw.sendStaleSeries(bodyString, scrapeTimestamp, false) sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
sw.lastScrape = append(sw.lastScrape[:0], bodyString...) sw.storeLastScrape(body.B)
} }
if canReturnToPool { sw.finalizeLastScrape()
if !mustSwitchToStreamParse {
// Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse // Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets which return big responses. // This should reduce memory usage when scraping targets which return big responses.
leveledbytebufferpool.Put(body) leveledbytebufferpool.Put(body)
@ -414,10 +456,8 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped := 0 samplesScraped := 0
samplesPostRelabeling := 0 samplesPostRelabeling := 0
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
// Do not pool sbr in order to reduce memory usage when scraping big responses. // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
sbr := &streamBodyReader{ sbr := &streamBodyReader{}
body: make([]byte, 0, len(sw.lastScrape)),
}
sr, err := sw.GetStreamReader() sr, err := sw.GetStreamReader()
if err != nil { if err != nil {
@ -448,6 +488,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
}, sw.logError) }, sw.logError)
sr.MustClose() sr.MustClose()
} }
sw.loadLastScrape()
bodyString := bytesutil.ToUnsafeString(sbr.body) bodyString := bytesutil.ToUnsafeString(sbr.body)
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString)
@ -484,9 +525,9 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
if !areIdenticalSeries { if !areIdenticalSeries {
sw.sendStaleSeries(bodyString, scrapeTimestamp, false) sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
sw.lastScrape = append(sw.lastScrape[:0], bodyString...) sw.storeLastScrape(sbr.body)
} }
runtime.KeepAlive(sbr) // this is needed in order to prevent from GC'ing data pointed by bodyString sw.finalizeLastScrape()
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
// Do not track active series in streaming mode, since this may need too big amounts of memory // Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics. // when the target exports too big number of metrics.