From 32793adbd9969b643ebf1d681f4aeff172675140 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 16 Oct 2021 12:58:34 +0300 Subject: [PATCH] lib/promscrape: store the last scraped response in compressed form if its size exceeds -promscrape.minResponseSizeForStreamParse This should reduce memory usage when scraping targets with big response bodies. --- lib/promscrape/scrapework.go | 67 +++++++++++++++++++++++++++++------- 1 file changed, 54 insertions(+), 13 deletions(-) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 0624abbab7..517fd0b4e1 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "math/bits" - "runtime" "strconv" "strings" "sync" @@ -14,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -209,6 +209,46 @@ type scrapeWork struct { // lastScrape holds the last response from scrape target. lastScrape []byte + + // lastScrapeCompressed is used for storing the compressed lastScrape between scrapes + // in stream parsing mode in order to reduce memory usage when the lastScrape size + // equals to or exceeds -promscrape.minResponseSizeForStreamParse + lastScrapeCompressed []byte +} + +func (sw *scrapeWork) loadLastScrape() { + if len(sw.lastScrapeCompressed) == 0 { + // The lastScrape is already stored in sw.lastScrape + return + } + b, err := encoding.DecompressZSTD(sw.lastScrape[:0], sw.lastScrapeCompressed) + if err != nil { + logger.Panicf("BUG: cannot unpack compressed previous response: %s", err) + } + sw.lastScrape = b +} + +func (sw *scrapeWork) storeLastScrape(lastScrape []byte) { + mustCompress := minResponseSizeForStreamParse.N > 0 && len(lastScrape) >= minResponseSizeForStreamParse.N + if mustCompress { + sw.lastScrapeCompressed = encoding.CompressZSTDLevel(sw.lastScrapeCompressed[:0], lastScrape, 1) + sw.lastScrape = nil + } else { + sw.lastScrape = append(sw.lastScrape[:0], lastScrape...) + sw.lastScrapeCompressed = nil + } +} + +func (sw *scrapeWork) finalizeLastScrape() { + if len(sw.lastScrapeCompressed) > 0 { + // The compressed lastScrape is available in sw.lastScrapeCompressed. + // Release the memory occupied by sw.lastScrape, so it won't be occupied between scrapes. + sw.lastScrape = nil + } + if len(sw.lastScrape) > 0 { + // Release the memory occupied by sw.lastScrapeCompressed, so it won't be occupied between scrapes. + sw.lastScrapeCompressed = nil + } } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -307,7 +347,7 @@ func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool { } func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { - if sw.mustSwitchToStreamParseMode(sw.prevBodyLen) || *streamParse || sw.Config.StreamParse { + if *streamParse || sw.Config.StreamParse || sw.mustSwitchToStreamParseMode(sw.prevBodyLen) { // Read data from scrape targets in streaming manner. // This case is optimized for targets exposing more than ten thousand of metrics per target. return sw.scrapeStream(scrapeTimestamp, realTimestamp) @@ -325,6 +365,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeResponseSize.Update(float64(len(body.B))) up := 1 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + sw.loadLastScrape() bodyString := bytesutil.ToUnsafeString(body.B) lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) @@ -371,10 +412,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) - sw.prevBodyLen = len(body.B) + sw.prevBodyLen = len(bodyString) wc.reset() - canReturnToPool := !sw.mustSwitchToStreamParseMode(len(body.B)) - if canReturnToPool { + mustSwitchToStreamParse := sw.mustSwitchToStreamParseMode(len(bodyString)) + if !mustSwitchToStreamParse { // Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse // This should reduce memory usage when scraping targets with big responses. writeRequestCtxPool.Put(wc) @@ -382,9 +423,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error // body must be released only after wc is released, since wc refers to body. if !areIdenticalSeries { sw.sendStaleSeries(bodyString, scrapeTimestamp, false) - sw.lastScrape = append(sw.lastScrape[:0], bodyString...) + sw.storeLastScrape(body.B) } - if canReturnToPool { + sw.finalizeLastScrape() + if !mustSwitchToStreamParse { // Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse // This should reduce memory usage when scraping targets which return big responses. leveledbytebufferpool.Put(body) @@ -414,10 +456,8 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) - // Do not pool sbr in order to reduce memory usage when scraping big responses. - sbr := &streamBodyReader{ - body: make([]byte, 0, len(sw.lastScrape)), - } + // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. + sbr := &streamBodyReader{} sr, err := sw.GetStreamReader() if err != nil { @@ -448,6 +488,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { }, sw.logError) sr.MustClose() } + sw.loadLastScrape() bodyString := bytesutil.ToUnsafeString(sbr.body) lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) @@ -484,9 +525,9 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { writeRequestCtxPool.Put(wc) if !areIdenticalSeries { sw.sendStaleSeries(bodyString, scrapeTimestamp, false) - sw.lastScrape = append(sw.lastScrape[:0], bodyString...) + sw.storeLastScrape(sbr.body) } - runtime.KeepAlive(sbr) // this is needed in order to prevent from GC'ing data pointed by bodyString + sw.finalizeLastScrape() tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) // Do not track active series in streaming mode, since this may need too big amounts of memory // when the target exports too big number of metrics.