From f3e89754a9db96242786ef10602b0d819ff3c82e Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 12 Sep 2021 12:49:19 +0300 Subject: [PATCH] lib/promscrape: reduce CPU usage for common case when calculating `scrape_series_added` metric Also reduce CPU usage when applying `series_limit` to scrape targets with constant set of metrics. The main idea is to perform the calculations on scrape_series_added and series_limit only if the set of metrics exposed by the target has been changed. Scrape targets rarely change the set of exposed metrics, so this optimization should reduce CPU usage in general case. --- docs/CHANGELOG.md | 4 +- lib/promscrape/scrapework.go | 89 ++++++++++++----------- lib/promscrape/scrapework_test.go | 2 +- lib/protoparser/prometheus/parser.go | 7 +- lib/protoparser/prometheus/parser_test.go | 6 +- 5 files changed, 56 insertions(+), 52 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3ce5714a0..428b3f76e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,7 +11,9 @@ sort: 15 * FEATURE: vmagent: add ability to set `series_limit` option for a particular scrape target via `__series_limit__` label. This allows setting the limit on the number of time series on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) for details. * FEATURE: vmagent: add ability to set `stream_parse` option for a particular scrape target via `__stream_parse__` label. This allows managing the stream parsing mode on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for details. * FEATURE: add new relabeling actions: `keep_metrics` and `drop_metrics`. This simplifies metrics filtering by metric names. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. -* FAETURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. +* FEATURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. +* FEATURE: vmagent: reduce CPU usage when calculating the number of newly added series per scrape (this number is sent to remote storage in `scrape_series_added` metric). +* FEATURE: vmagent: reduce CPU usage when applying `series_limit` to scrape targets with constant set of metrics. See more information about `series_limit` [here](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). * BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm . diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 93581b773..364bf487e 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -6,6 +6,7 @@ import ( "math" "math/bits" "strconv" + "strings" "sync" "time" @@ -178,9 +179,10 @@ type scrapeWork struct { tmpRow parser.Row - // the seriesMap, seriesAdded and labelsHashBuf are used for fast calculation of `scrape_series_added` metric. - seriesMap map[uint64]struct{} - seriesAdded int + // This flag is set to true if series_limit is exceeded. + seriesLimitExceeded bool + + // labelsHashBuf is used for calculating the hash on series labels labelsHashBuf []byte // Optional limiter on the number of unique series per scrape target. @@ -195,7 +197,6 @@ type scrapeWork struct { prevLabelsLen int // lastScrape holds the last response from scrape target. - // It is used for generating Prometheus stale markers. lastScrape []byte } @@ -307,6 +308,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error up := 1 wc := writeRequestCtxPool.Get(sw.prevLabelsLen) bodyString := bytesutil.ToUnsafeString(body.B) + lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) + areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString) if err != nil { up = 0 scrapesFailed.Inc() @@ -327,8 +330,21 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error err = fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } - sw.updateSeriesAdded(wc) - seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) + if up == 0 { + bodyString = "" + } + seriesAdded := 0 + if !areIdenticalSeries { + // The returned value for seriesAdded may be bigger than the real number of added series + // if some series were removed during relabeling. + // This is a trade-off between performance and accuracy. + seriesAdded = sw.getSeriesAdded(bodyString) + } + if sw.seriesLimitExceeded || !areIdenticalSeries { + if sw.applySeriesLimit(wc) { + sw.seriesLimitExceeded = true + } + } sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) @@ -340,12 +356,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. sw.prevBodyLen = len(body.B) + if !areIdenticalSeries { + sw.sendStaleSeries(bodyString, scrapeTimestamp, false) + } + sw.lastScrape = append(sw.lastScrape[:0], bodyString...) leveledbytebufferpool.Put(body) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) - if up == 0 { - bodyString = "" - } - sw.sendStaleSeries(bodyString, scrapeTimestamp, false) return err } @@ -383,7 +399,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } - sw.updateSeriesAdded(wc) sw.pushData(&wc.writeRequest) wc.resetNoRows() return nil @@ -404,12 +419,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { } scrapesFailed.Inc() } - seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) - sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) + // scrape_series_added isn't calculated in streaming mode, + // since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics. + sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp) sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() @@ -486,11 +502,16 @@ func (wc *writeRequestCtx) resetNoRows() { var writeRequestCtxPool leveledWriteRequestCtxPool -func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { - if sw.seriesMap == nil { - sw.seriesMap = make(map[uint64]struct{}, len(wc.writeRequest.Timeseries)) +func (sw *scrapeWork) getSeriesAdded(currScrape string) int { + if currScrape == "" { + return 0 } - m := sw.seriesMap + lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) + bodyString := parser.GetRowsDiff(currScrape, lastScrape) + return strings.Count(bodyString, "\n") +} + +func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) bool { seriesLimit := *seriesLimitPerTarget if sw.Config.SeriesLimit > 0 { seriesLimit = sw.Config.SeriesLimit @@ -499,24 +520,26 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour) } hsl := sw.seriesLimiter + if hsl == nil { + return false + } dstSeries := wc.writeRequest.Timeseries[:0] job := sw.Config.Job() + limitExceeded := false for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) - if hsl != nil && !hsl.Add(h) { + if !hsl.Add(h) { // The limit on the number of hourly unique series per scrape target has been exceeded. // Drop the metric. metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{scrape_job_original=%q,scrape_job=%q,scrape_target=%q}`, sw.Config.jobNameOriginal, job, sw.Config.ScrapeURL)).Inc() + limitExceeded = true continue } dstSeries = append(dstSeries, ts) - if _, ok := m[h]; !ok { - m[h] = struct{}{} - sw.seriesAdded++ - } } wc.writeRequest.Timeseries = dstSeries + return limitExceeded } func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAutoSeries bool) { @@ -524,18 +547,11 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut return } lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) - if parser.AreIdenticalSeriesFast(lastScrape, currScrape) { - // Fast path: the current scrape contains the same set of series as the previous scrape. - return - } - // Slow path: the current scrape contains different set of series than the previous scrape. - // Detect missing series in the current scrape and send stale markers for them. bodyString := lastScrape if currScrape != "" { - bodyString = parser.GetDiffWithStaleRows(lastScrape, currScrape) + bodyString = parser.GetRowsDiff(lastScrape, currScrape) } - wc := writeRequestCtxPool.Get(sw.prevLabelsLen) - defer writeRequestCtxPool.Put(wc) + wc := &writeRequestCtx{} if bodyString != "" { wc.rows.Unmarshal(bodyString) srcRows := wc.rows.Rows @@ -562,17 +578,6 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut } } sw.pushData(&wc.writeRequest) - sw.lastScrape = append(sw.lastScrape[:0], currScrape...) -} - -func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int { - seriesAdded := sw.seriesAdded - sw.seriesAdded = 0 - if len(sw.seriesMap) > 4*lastScrapeSize { - // Reset seriesMap, since it occupies more than 4x metrics collected during the last scrape. - sw.seriesMap = make(map[uint64]struct{}, lastScrapeSize) - } - return seriesAdded } func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 { diff --git a/lib/promscrape/scrapework_test.go b/lib/promscrape/scrapework_test.go index b19093c7d..8b9dbac00 100644 --- a/lib/promscrape/scrapework_test.go +++ b/lib/promscrape/scrapework_test.go @@ -325,7 +325,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) { scrape_samples_scraped{job="xx",instance="foo.com"} 4 123 scrape_duration_seconds{job="xx",instance="foo.com"} 0 123 scrape_samples_post_metric_relabeling{job="xx",instance="foo.com"} 1 123 - scrape_series_added{job="xx",instance="foo.com"} 1 123 + scrape_series_added{job="xx",instance="foo.com"} 4 123 `) f(` foo{bar="baz"} 34.44 diff --git a/lib/protoparser/prometheus/parser.go b/lib/protoparser/prometheus/parser.go index dd250e1aa..fec8a972a 100644 --- a/lib/protoparser/prometheus/parser.go +++ b/lib/protoparser/prometheus/parser.go @@ -366,10 +366,10 @@ func prevBackslashesCount(s string) int { return n } -// GetDiffWithStaleRows returns rows from s1, which are missing in s2. +// GetRowsDiff returns rows from s1, which are missing in s2. // // The returned rows have default value 0 and have no timestamps. -func GetDiffWithStaleRows(s1, s2 string) string { +func GetRowsDiff(s1, s2 string) string { var r1, r2 Rows r1.Unmarshal(s1) r2.Unmarshal(s2) @@ -386,11 +386,8 @@ func GetDiffWithStaleRows(s1, s2 string) string { r := &rows1[i] key := marshalMetricNameWithTags(r) if !m[key] { - logger.Infof("missing %s", key) diff = append(diff, key...) diff = append(diff, " 0\n"...) - } else { - logger.Infof("found %s", key) } } return string(diff) diff --git a/lib/protoparser/prometheus/parser_test.go b/lib/protoparser/prometheus/parser_test.go index 27812c9c4..d2a2baaed 100644 --- a/lib/protoparser/prometheus/parser_test.go +++ b/lib/protoparser/prometheus/parser_test.go @@ -6,12 +6,12 @@ import ( "testing" ) -func TestGetDiffWithStaleRows(t *testing.T) { +func TestGetRowsDiff(t *testing.T) { f := func(s1, s2, resultExpected string) { t.Helper() - result := GetDiffWithStaleRows(s1, s2) + result := GetRowsDiff(s1, s2) if result != resultExpected { - t.Fatalf("unexpected result for GetDiffWithStaleRows(%q, %q); got %q; want %q", s1, s2, result, resultExpected) + t.Fatalf("unexpected result for GetRowsDiff(%q, %q); got %q; want %q", s1, s2, result, resultExpected) } } f("", "", "")