diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7cf32d73d0..6ea6587105 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -23,6 +23,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add `vm-native-step-interval` command line flag for `vm-native` mode. New option allows splitting the import process into chunks by time interval. This helps migrating data sets with high churn rate and provides better control over the process. See [feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2733). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add `top queries` tab, which shows various stats for recently executed queries. See [these docs](https://docs.victoriametrics.com/#top-queries) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2707). * FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `debug` mode to the alerting rule settings for printing additional information into logs during evaluation. See `debug` param in [alerting rule config](https://docs.victoriametrics.com/vmalert.html#alerting-rules). +* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): minimize the time needed for reading large responses from scrape targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). This should reduce scrape durations for such targets as [kube-state-metrics](https://github.com/kubernetes/kube-state-metrics) running in a big Kubernetes cluster. * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly calculate `rate_over_sum(m[d])` as `sum_over_time(m[d])/d`. Previously the `sum_over_time(m[d])` could be improperly divided by smaller than `d` time range. See [rate_over_sum() docs](https://docs.victoriametrics.com/MetricsQL.html#rate_over_sum) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3045). * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly calculate query results at `vmselect`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3067). The issue has been introduced in [v1.81.0](https://docs.victoriametrics.com/CHANGELOG.html#v1810). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index ae3dc4d4c7..d08d5d04d6 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -516,19 +516,35 @@ func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) { } type streamBodyReader struct { - sr *streamReader - body []byte - bodyLen int - captureBody bool + body []byte + bodyLen int + readOffset int +} + +func (sbr *streamBodyReader) Init(sr *streamReader) error { + sbr.body = nil + sbr.bodyLen = 0 + sbr.readOffset = 0 + // Read the whole response body in memory before parsing it in stream mode. + // This minimizes the time needed for reading response body from scrape target. + startTime := fasttime.UnixTimestamp() + body, err := io.ReadAll(sr) + if err != nil { + d := fasttime.UnixTimestamp() - startTime + return fmt.Errorf("cannot read stream body in %d seconds: %w", d, err) + } + sbr.body = body + sbr.bodyLen = len(body) + return nil } func (sbr *streamBodyReader) Read(b []byte) (int, error) { - n, err := sbr.sr.Read(b) - sbr.bodyLen += n - if sbr.captureBody { - sbr.body = append(sbr.body, b[:n]...) + if sbr.readOffset >= len(sbr.body) { + return 0, io.EOF } - return n, err + n := copy(b, sbr.body[sbr.readOffset:]) + sbr.readOffset += n + return n, nil } func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { @@ -536,37 +552,37 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesPostRelabeling := 0 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) // Do not pool sbr and do not pre-allocate sbr.body in order to reduce memory usage when scraping big responses. - sbr := &streamBodyReader{ - captureBody: !*noStaleMarkers, - } + var sbr streamBodyReader sr, err := sw.GetStreamReader() if err != nil { err = fmt.Errorf("cannot read data: %s", err) } else { var mu sync.Mutex - sbr.sr = sr - err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error { - mu.Lock() - defer mu.Unlock() - samplesScraped += len(rows) - for i := range rows { - sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) - } - // Push the collected rows to sw before returning from the callback, since they cannot be held - // after returning from the callback - this will result in data race. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { + err = sbr.Init(sr) + if err != nil { + err = parser.ParseStream(&sbr, scrapeTimestamp, false, func(rows []parser.Row) error { + mu.Lock() + defer mu.Unlock() + samplesScraped += len(rows) + for i := range rows { + sw.addRowToTimeseries(wc, &rows[i], scrapeTimestamp, true) + } + // Push the collected rows to sw before returning from the callback, since they cannot be held + // after returning from the callback - this will result in data race. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825#issuecomment-723198247 + samplesPostRelabeling += len(wc.writeRequest.Timeseries) + if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { + wc.resetNoRows() + scrapesSkippedBySampleLimit.Inc() + return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ + "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) + } + sw.pushData(sw.Config.AuthToken, &wc.writeRequest) wc.resetNoRows() - scrapesSkippedBySampleLimit.Inc() - return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ - "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) - } - sw.pushData(sw.Config.AuthToken, &wc.writeRequest) - wc.resetNoRows() - return nil - }, sw.logError) + return nil + }, sw.logError) + } sr.MustClose() } lastScrape := sw.loadLastScrape()