mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/promscrape: do not populate response body to memory in stream parsing mode if -promscrape.noStaleMarkers is set
The response body isn't used if -promscrape.noStaleMarkers is set after the commit 2876137c92
,
so there is no sense in pupulating it in memory. This should reduce memory usage when scraping big responses.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1728#issuecomment-949630694
This commit is contained in:
parent
fcdb0c415e
commit
b08f51f5d3
1 changed files with 13 additions and 6 deletions
|
@ -453,13 +453,18 @@ func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamBodyReader struct {
|
type streamBodyReader struct {
|
||||||
sr *streamReader
|
sr *streamReader
|
||||||
body []byte
|
body []byte
|
||||||
|
bodyLen int
|
||||||
|
captureBody bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sbr *streamBodyReader) Read(b []byte) (int, error) {
|
func (sbr *streamBodyReader) Read(b []byte) (int, error) {
|
||||||
n, err := sbr.sr.Read(b)
|
n, err := sbr.sr.Read(b)
|
||||||
sbr.body = append(sbr.body, b[:n]...)
|
sbr.bodyLen += n
|
||||||
|
if sbr.captureBody {
|
||||||
|
sbr.body = append(sbr.body, b[:n]...)
|
||||||
|
}
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,7 +473,9 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
samplesPostRelabeling := 0
|
samplesPostRelabeling := 0
|
||||||
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
|
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
|
||||||
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
|
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
|
||||||
sbr := &streamBodyReader{}
|
sbr := &streamBodyReader{
|
||||||
|
captureBody: !*noStaleMarkers,
|
||||||
|
}
|
||||||
|
|
||||||
sr, err := sw.GetStreamReader()
|
sr, err := sw.GetStreamReader()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -507,7 +514,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
endTimestamp := time.Now().UnixNano() / 1e6
|
endTimestamp := time.Now().UnixNano() / 1e6
|
||||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
scrapeResponseSize.Update(float64(len(bodyString)))
|
scrapeResponseSize.Update(float64(sbr.bodyLen))
|
||||||
up := 1
|
up := 1
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if samplesScraped == 0 {
|
if samplesScraped == 0 {
|
||||||
|
@ -530,7 +537,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
|
||||||
sw.pushData(&wc.writeRequest)
|
sw.pushData(&wc.writeRequest)
|
||||||
sw.prevLabelsLen = len(wc.labels)
|
sw.prevLabelsLen = len(wc.labels)
|
||||||
sw.prevBodyLen = len(bodyString)
|
sw.prevBodyLen = sbr.bodyLen
|
||||||
wc.reset()
|
wc.reset()
|
||||||
writeRequestCtxPool.Put(wc)
|
writeRequestCtxPool.Put(wc)
|
||||||
if !areIdenticalSeries {
|
if !areIdenticalSeries {
|
||||||
|
|
Loading…
Reference in a new issue