lib/promscrape: read response body into memory in stream parsing mode before parsing it

This reduces scrape duration for targets returning big responses.

The response body was already read into memory in stream parsing mode before this change,
so this commit shouldn't increase memory usage.
This commit is contained in:
Aliaksandr Valialkin 2022-09-14 13:14:04 +03:00
parent ccad651a61
commit 74c00a8762
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 50 additions and 33 deletions

View file

@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `vm-native-step-interval` command line flag for `vm-native` mode. New option allows splitting the import process into chunks by time interval. This helps migrating data sets with high churn rate and provides better control over the process. See [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2733).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `top queries` tab, which shows various stats for recently executed queries. See [these docs](https://docs.victoriametrics.com/#top-queries) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2707).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `debug` mode to the alerting rule settings for printing additional information into logs during evaluation. See `debug` param in [alerting rule config](https://docs.victoriametrics.com/vmalert.html#alerting-rules).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): minimize the time needed for reading large responses from scrape targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). This should reduce scrape durations for such targets as [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) running in a big Kubernetes cluster.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate `rate_over_sum(m[d])` as `sum_over_time(m[d])/d`. Previously the `sum_over_time(m[d])` could be improperly divided by smaller than `d` time range. See [rate_over_sum() docs](https://docs.victoriametrics.com/MetricsQL.html#rate_over_sum) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3045).
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly calculate query results at `vmselect`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3067). The issue has been introduced in [v1.81.0](https://docs.victoriametrics.com/CHANGELOG.html#v1810).

View file

@ -516,19 +516,35 @@ func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {
}
type streamBodyReader struct {
sr *streamReader
body []byte
bodyLen int
captureBody bool
body []byte
bodyLen int
readOffset int
}
func (sbr *streamBodyReader) Init(sr *streamReader) error {
sbr.body = nil
sbr.bodyLen = 0
sbr.readOffset = 0
// Read the whole response body in memory before parsing it in stream mode.
// This minimizes the time needed for reading response body from scrape target.
startTime := fasttime.UnixTimestamp()
body, err := io.ReadAll(sr)
if err != nil {
d := fasttime.UnixTimestamp() - startTime
return fmt.Errorf("cannot read stream body in %d seconds: %w", d, err)
}
sbr.body = body
sbr.bodyLen = len(body)
return nil
}
func (sbr *streamBodyReader) Read(b []byte) (int, error) {
n, err := sbr.sr.Read(b)
sbr.bodyLen += n
if sbr.captureBody {
sbr.body = append(sbr.body, b[:n]...)
if sbr.readOffset >= len(sbr.body) {
return 0, io.EOF
}
return n, err
n := copy(b, sbr.body[sbr.readOffset:])
sbr.readOffset += n
return n, nil
}
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
@ -536,37 +552,37 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesPostRelabeling := 0
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
// Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses.
sbr := &streamBodyReader{
captureBody: !*noStaleMarkers,
}
var sbr streamBodyReader
sr, err := sw.GetStreamReader()
if err != nil {
err = fmt.Errorf("cannot read data: %s", err)
} else {
var mu sync.Mutex
sbr.sr = sr
err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
samplesScraped += len(rows)
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit {
err = sbr.Init(sr)
if err != nil {
err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error {
mu.Lock()
defer mu.Unlock()
samplesScraped += len(rows)
for i := range rows {
sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true)
}
// Push the collected rows to sw before returning from the callback, since they cannot be held
// after returning from the callback - this will result in data race.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit {
wc.resetNoRows()
scrapesSkippedBySampleLimit.Inc()
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
}
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows()
scrapesSkippedBySampleLimit.Inc()
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
}
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.resetNoRows()
return nil
}, sw.logError)
return nil
}, sw.logError)
}
sr.MustClose()
}
lastScrape := sw.loadLastScrape()