diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 8c296d31d..1abe1e4a3 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -302,7 +302,7 @@ You can read more about relabeling in the following articles: * If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target. * Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`. -Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set. +Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set. ## Stream parsing mode diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e59773baa..e06d72af9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,7 +9,9 @@ sort: 15 * FEATURE: vmagent: expose `-promscrape.config` contents at `/config` page as Prometheus does. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1695). * FEATURE: vmagent: add `show original labels` button per each scrape target displayed at `http://vmagent;8429/targets` page. This should improve debuggability for service discovery and relabeling issues similar to [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1664). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1698). * FEATURE: vmagent: shard targets among cluster nodes after the relabeling is applied. This should guarantee that targets with the same set of labels go to the same `vmagent` node in the cluster. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687). -* FEATURE: vmagent: add `-promscrape.minResponseSizeForStreamParse` command-line flag, which can be used for instructing `vmagent` to automatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for scrape targets with big responses. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring). +* FEATURE: vmagent: atomatically switch to [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) if the response from the given target exceeds the command-line flag value `-promscrape.minResponseSizeForStreamParse`. This should reduce memory usage when `vmagent` scrapes targets with non-uniform response sizes (this is the case in Kubernetes monitoring). +* FEATURE: vmagent: send Prometheus-like staleness marks in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously staleness marks wern't sent in stream parsing mode. See [these docs](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) for details. +* FEATURE: vmagent: properly calculate `scrape_series_added` metric for targets in [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode). Previously it was set to 0 in stream parsing mode. See [more details about this metric](https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series). * FEATURE: vmagent: return error if `sample_limit` or `series_limit` options are set when [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) is enabled, since these limits cannot be applied in stream parsing mode. * FEATURE: add trigonometric functions, which are going to be added in [Prometheus 2.31](https://github.com/prometheus/prometheus/pull/9239): [acosh](https://docs.victoriametrics.com/MetricsQL.html#acosh), [asinh](https://docs.victoriametrics.com/MetricsQL.html#asinh), [atan](https://docs.victoriametrics.com/MetricsQL.html#atan), [atanh](https://docs.victoriametrics.com/MetricsQL.html#atanh), [cosh](https://docs.victoriametrics.com/MetricsQL.html#cosh), [deg](https://docs.victoriametrics.com/MetricsQL.html#deg), [rad](https://docs.victoriametrics.com/MetricsQL.html#rad), [sinh](https://docs.victoriametrics.com/MetricsQL.html#sinh), [tan](https://docs.victoriametrics.com/MetricsQL.html#tan), [tanh](https://docs.victoriametrics.com/MetricsQL.html#tanh). Also add `atan2` binary operator. See [this pull request](https://github.com/prometheus/prometheus/pull/9248). * FEATURE: consistently return the same set of time series from [limitk](https://docs.victoriametrics.com/MetricsQL.html#limitk) function. This improves the usability of periodically refreshed graphs. diff --git a/docs/vmagent.md b/docs/vmagent.md index 7e4afd52f..beb6df812 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -306,7 +306,7 @@ You can read more about relabeling in the following articles: * If the scrape target is removed from the list of targets, then stale markers are sent for all the metrics scraped from this target. * Stale markers are sent for all the scraped metrics on graceful shutdown of `vmagent`. -Prometheus staleness markers aren't sent to `-remoteWrite.url` in [stream parsing mode](#stream-parsing-mode) or if `-promscrape.noStaleMarkers` command-line is set. +Prometheus staleness markers aren't sent to `-remoteWrite.url` if `-promscrape.noStaleMarkers` command-line flag is set. ## Stream parsing mode diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 73d6e0b68..0624abbab 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "math/bits" + "runtime" "strconv" "strings" "sync" @@ -117,8 +118,9 @@ type ScrapeWork struct { jobNameOriginal string } -func (sw *ScrapeWork) canUseStreamParse() bool { - // Stream parsing mode cannot be used if SampleLimit or SeriesLimit is set. +func (sw *ScrapeWork) canSwitchToStreamParseMode() bool { + // Deny switching to stream parse mode if `sample_limit` or `series_limit` options are set, + // since these limits cannot be applied in stream parsing mode. return sw.SampleLimit <= 0 && sw.SeriesLimit <= 0 } @@ -297,8 +299,15 @@ var ( pushDataDuration = metrics.NewHistogram("vm_promscrape_push_data_duration_seconds") ) +func (sw *scrapeWork) mustSwitchToStreamParseMode(responseSize int) bool { + if minResponseSizeForStreamParse.N <= 0 { + return false + } + return sw.Config.canSwitchToStreamParseMode() && responseSize >= minResponseSizeForStreamParse.N +} + func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { - if (sw.Config.canUseStreamParse() && sw.prevBodyLen >= minResponseSizeForStreamParse.N) || *streamParse || sw.Config.StreamParse { + if sw.mustSwitchToStreamParseMode(sw.prevBodyLen) || *streamParse || sw.Config.StreamParse { // Read data from scrape targets in streaming manner. // This case is optimized for targets exposing more than ten thousand of metrics per target. return sw.scrapeStream(scrapeTimestamp, realTimestamp) @@ -364,7 +373,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.prevLabelsLen = len(wc.labels) sw.prevBodyLen = len(body.B) wc.reset() - if len(body.B) < minResponseSizeForStreamParse.N { + canReturnToPool := !sw.mustSwitchToStreamParseMode(len(body.B)) + if canReturnToPool { // Return wc to the pool if the parsed response size was smaller than -promscrape.minResponseSizeForStreamParse // This should reduce memory usage when scraping targets with big responses. writeRequestCtxPool.Put(wc) @@ -372,11 +382,11 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error // body must be released only after wc is released, since wc refers to body. if !areIdenticalSeries { sw.sendStaleSeries(bodyString, scrapeTimestamp, false) - } - if len(body.B) < minResponseSizeForStreamParse.N { - // Save body to sw.lastScrape and return it to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse - // This should reduce memory usage when scraping targets which return big responses. sw.lastScrape = append(sw.lastScrape[:0], bodyString...) + } + if canReturnToPool { + // Return wc to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse + // This should reduce memory usage when scraping targets which return big responses. leveledbytebufferpool.Put(body) } tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) @@ -389,18 +399,33 @@ func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { pushDataDuration.UpdateDuration(startTime) } +type streamBodyReader struct { + sr *streamReader + body []byte +} + +func (sbr *streamBodyReader) Read(b []byte) (int, error) { + n, err := sbr.sr.Read(b) + sbr.body = append(sbr.body, b[:n]...) + return n, err +} + func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 - responseSize := int64(0) wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + // Do not pool sbr in order to reduce memory usage when scraping big responses. + sbr := &streamBodyReader{ + body: make([]byte, 0, len(sw.lastScrape)), + } sr, err := sw.GetStreamReader() if err != nil { err = fmt.Errorf("cannot read data: %s", err) } else { var mu sync.Mutex - err = parser.ParseStream(sr, scrapeTimestamp, false, func(rows []parser.Row) error { + sbr.sr = sr + err = parser.ParseStream(sbr, scrapeTimestamp, false, func(rows []parser.Row) error { mu.Lock() defer mu.Unlock() samplesScraped += len(rows) @@ -421,15 +446,17 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { wc.resetNoRows() return nil }, sw.logError) - responseSize = sr.bytesRead sr.MustClose() } + bodyString := bytesutil.ToUnsafeString(sbr.body) + lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) + areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) scrapedSamples.Update(float64(samplesScraped)) endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) - scrapeResponseSize.Update(float64(responseSize)) + scrapeResponseSize.Update(float64(len(bodyString))) up := 1 if err != nil { if samplesScraped == 0 { @@ -437,19 +464,29 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } scrapesFailed.Inc() } + seriesAdded := 0 + if !areIdenticalSeries { + // The returned value for seriesAdded may be bigger than the real number of added series + // if some series were removed during relabeling. + // This is a trade-off between performance and accuracy. + seriesAdded = sw.getSeriesAdded(bodyString) + } sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) - // scrape_series_added isn't calculated in streaming mode, - // since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics. - sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_timeout_seconds", sw.Config.ScrapeTimeout.Seconds(), scrapeTimestamp) sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) - sw.prevBodyLen = int(responseSize) + sw.prevBodyLen = len(bodyString) wc.reset() writeRequestCtxPool.Put(wc) + if !areIdenticalSeries { + sw.sendStaleSeries(bodyString, scrapeTimestamp, false) + sw.lastScrape = append(sw.lastScrape[:0], bodyString...) + } + runtime.KeepAlive(sbr) // this is needed in order to prevent from GC'ing data pointed by bodyString tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) // Do not track active series in streaming mode, since this may need too big amounts of memory // when the target exports too big number of metrics.