lib/promscrape: store the full response in stream parsing mode in scrapeWork.lastScrape byte slice

This allows sending staleness marks and properly calculate scrape_series_added metric in stream parsing mode
at the cost of the increased memory usage, since now the potentially big response is kept
in the lastScrape byte slice per each scrapeWork.

In practice the memory usage increase shouldn't be big, since the response size
is usually much smaller than the parsed metrics from this response after the relabeling,
which usually adds a big pile of target-specific labels per each metric.
This commit is contained in:
Aliaksandr Valialkin 2021-10-15 15:26:22 +03:00
parent 3e9beb0f8d
commit 0452a8d4e8
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
4 changed files with 58 additions and 19 deletions

View file

@ -302,7 +302,7 @@ You can read more about relabeling in the following articles:
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target. * If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`. * Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set. Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set.
## Stream parsing mode ## Stream parsing mode

View file

@ -9,7 +9,9 @@ sort: 15
* FEATURE: vmagent: expose `-promscrape.config` contents at `/config` page as Prometheus does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1695). * FEATURE: vmagent: expose `-promscrape.config` contents at `/config` page as Prometheus does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1695).
* FEATURE: vmagent: add `show original labels` button per each scrape target displayed at `http://vmagent;8429/targets` page. This should improve debuggability for service discovery and relabeling issues similar to [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1664). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1698). * FEATURE: vmagent: add `show original labels` button per each scrape target displayed at `http://vmagent;8429/targets` page. This should improve debuggability for service discovery and relabeling issues similar to [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1664). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1698).
* FEATURE: vmagent: shard targets among cluster nodes after the relabeling is applied. This should guarantee that targets with the same set of labels go to the same `vmagent` node in the cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687). * FEATURE: vmagent: shard targets among cluster nodes after the relabeling is applied. This should guarantee that targets with the same set of labels go to the same `vmagent` node in the cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687).
* FEATURE: vmagent: add `-promscrape.minResponseSizeForStreamParse` command-line flag, which can be used for instructing `vmagent` to automatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for scrape targets with big responses. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring). * FEATURE: vmagent: atomatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) if the response from the given target exceeds the command-line flag value `-promscrape.minResponseSizeForStreamParse`. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring).
* FEATURE: vmagent: send Prometheus-like staleness marks in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously staleness marks wern't sent in stream parsing mode. See [these docs](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) for details.
* FEATURE: vmagent: properly calculate `scrape_series_added` metric for targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously it was set to 0 in stream parsing mode. See [more details about this metric](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series).
* FEATURE: vmagent: return error if `sample_limit` or `series_limit` options are set when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is enabled, since these limits cannot be applied in stream parsing mode. * FEATURE: vmagent: return error if `sample_limit` or `series_limit` options are set when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is enabled, since these limits cannot be applied in stream parsing mode.
* FEATURE: add trigonometric functions, which are going to be added in [Prometheus 2.31](https://github.com/prometheus/prometheus/pull/9239): [acosh](https://docs.victoriametrics.com/MetricsQL.html#acosh), [asinh](https://docs.victoriametrics.com/MetricsQL.html#asinh), [atan](https://docs.victoriametrics.com/MetricsQL.html#atan), [atanh](https://docs.victoriametrics.com/MetricsQL.html#atanh), [cosh](https://docs.victoriametrics.com/MetricsQL.html#cosh), [deg](https://docs.victoriametrics.com/MetricsQL.html#deg), [rad](https://docs.victoriametrics.com/MetricsQL.html#rad), [sinh](https://docs.victoriametrics.com/MetricsQL.html#sinh), [tan](https://docs.victoriametrics.com/MetricsQL.html#tan), [tanh](https://docs.victoriametrics.com/MetricsQL.html#tanh). Also add `atan2` binary operator. See [this pull request](https://github.com/prometheus/prometheus/pull/9248). * FEATURE: add trigonometric functions, which are going to be added in [Prometheus 2.31](https://github.com/prometheus/prometheus/pull/9239): [acosh](https://docs.victoriametrics.com/MetricsQL.html#acosh), [asinh](https://docs.victoriametrics.com/MetricsQL.html#asinh), [atan](https://docs.victoriametrics.com/MetricsQL.html#atan), [atanh](https://docs.victoriametrics.com/MetricsQL.html#atanh), [cosh](https://docs.victoriametrics.com/MetricsQL.html#cosh), [deg](https://docs.victoriametrics.com/MetricsQL.html#deg), [rad](https://docs.victoriametrics.com/MetricsQL.html#rad), [sinh](https://docs.victoriametrics.com/MetricsQL.html#sinh), [tan](https://docs.victoriametrics.com/MetricsQL.html#tan), [tanh](https://docs.victoriametrics.com/MetricsQL.html#tanh). Also add `atan2` binary operator. See [this pull request](https://github.com/prometheus/prometheus/pull/9248).
* FEATURE: consistently return the same set of time series from [limitk](https://docs.victoriametrics.com/MetricsQL.html#limitk) function. This improves the usability of periodically refreshed graphs. * FEATURE: consistently return the same set of time series from [limitk](https://docs.victoriametrics.com/MetricsQL.html#limitk) function. This improves the usability of periodically refreshed graphs.

View file

@ -306,7 +306,7 @@ You can read more about relabeling in the following articles:
* If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target. * If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target.
* Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`. * Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`.
Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set. Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set.
## Stream parsing mode ## Stream parsing mode

View file

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"math" "math"
"math/bits" "math/bits"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -117,8 +118,9 @@ type ScrapeWork struct {
jobNameOriginal string jobNameOriginal string
} }
func (sw *ScrapeWork) canUseStreamParse() bool { func (sw *ScrapeWork) canSwitchToStreamParseMode() bool {
// Stream parsing mode cannot be used if SampleLimit or SeriesLimit is set. // Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set,
// since these limits cannot be applied in stream parsing mode.
return sw.SampleLimit <= 0 && sw.SeriesLimit <= 0 return sw.SampleLimit <= 0 && sw.SeriesLimit <= 0
} }
@ -297,8 +299,15 @@ var (
pushDataDuration = metrics.NewHistogram("vm_promscrape_push_data_duration_seconds") pushDataDuration = metrics.NewHistogram("vm_promscrape_push_data_duration_seconds")
) )
func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool {
if minResponseSizeForStreamParse.N <= 0 {
return false
}
return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N
}
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
if (sw.Config.canUseStreamParse() && sw.prevBodyLen >= minResponseSizeForStreamParse.N) || *streamParse || sw.Config.StreamParse { if sw.mustSwitchToStreamParseMode(sw.prevBodyLen) || *streamParse || sw.Config.StreamParse {
// Read data from scrape targets in streaming manner. // Read data from scrape targets in streaming manner.
// This case is optimized for targets exposing more than ten thousand of metrics per target. // This case is optimized for targets exposing more than ten thousand of metrics per target.
return sw.scrapeStream(scrapeTimestamp, realTimestamp) return sw.scrapeStream(scrapeTimestamp, realTimestamp)
@ -364,7 +373,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = len(body.B) sw.prevBodyLen = len(body.B)
wc.reset() wc.reset()
if len(body.B) < minResponseSizeForStreamParse.N { canReturnToPool := !sw.mustSwitchToStreamParseMode(len(body.B))
if canReturnToPool {
// Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse // Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets with big responses. // This should reduce memory usage when scraping targets with big responses.
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
@ -372,11 +382,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// body must be released only after wc is released, since wc refers to body. // body must be released only after wc is released, since wc refers to body.
if !areIdenticalSeries { if !areIdenticalSeries {
sw.sendStaleSeries(bodyString, scrapeTimestamp, false) sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
}
if len(body.B) < minResponseSizeForStreamParse.N {
// Save body to sw.lastScrape and return it to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets which return big responses.
sw.lastScrape = append(sw.lastScrape[:0], bodyString...) sw.lastScrape = append(sw.lastScrape[:0], bodyString...)
}
if canReturnToPool {
// Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets which return big responses.
leveledbytebufferpool.Put(body) leveledbytebufferpool.Put(body)
} }
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
@ -389,18 +399,33 @@ func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
pushDataDuration.UpdateDuration(startTime) pushDataDuration.UpdateDuration(startTime)
} }
type streamBodyReader struct {
sr *streamReader
body []byte
}
func (sbr *streamBodyReader) Read(b []byte) (int, error) {
n, err := sbr.sr.Read(b)
sbr.body = append(sbr.body, b[:n]...)
return n, err
}
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped := 0 samplesScraped := 0
samplesPostRelabeling := 0 samplesPostRelabeling := 0
responseSize := int64(0)
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
// Do not pool sbr in order to reduce memory usage when scraping big responses.
sbr := &streamBodyReader{
body: make([]byte, 0, len(sw.lastScrape)),
}
sr, err := sw.GetStreamReader() sr, err := sw.GetStreamReader()
if err != nil { if err != nil {
err = fmt.Errorf("cannot read data: %s", err) err = fmt.Errorf("cannot read data: %s", err)
} else { } else {
var mu sync.Mutex var mu sync.Mutex
err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { sbr.sr = sr
err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
samplesScraped += len(rows) samplesScraped += len(rows)
@ -421,15 +446,17 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
wc.resetNoRows() wc.resetNoRows()
return nil return nil
}, sw.logError) }, sw.logError)
responseSize = sr.bytesRead
sr.MustClose() sr.MustClose()
} }
bodyString := bytesutil.ToUnsafeString(sbr.body)
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString)
scrapedSamples.Update(float64(samplesScraped)) scrapedSamples.Update(float64(samplesScraped))
endTimestamp := time.Now().UnixNano() / 1e6 endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3 duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration) scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(responseSize)) scrapeResponseSize.Update(float64(len(bodyString)))
up := 1 up := 1
if err != nil { if err != nil {
if samplesScraped == 0 { if samplesScraped == 0 {
@ -437,19 +464,29 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
} }
scrapesFailed.Inc() scrapesFailed.Inc()
} }
seriesAdded := 0
if !areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
seriesAdded = sw.getSeriesAdded(bodyString)
}
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
// scrape_series_added isn't calculated in streaming mode, sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
// since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics.
sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp)
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
sw.prevBodyLen = int(responseSize) sw.prevBodyLen = len(bodyString)
wc.reset() wc.reset()
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
if !areIdenticalSeries {
sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
sw.lastScrape = append(sw.lastScrape[:0], bodyString...)
}
runtime.KeepAlive(sbr) // this is needed in order to prevent from GC'ing data pointed by bodyString
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
// Do not track active series in streaming mode, since this may need too big amounts of memory // Do not track active series in streaming mode, since this may need too big amounts of memory
// when the target exports too big number of metrics. // when the target exports too big number of metrics.