lib/promscrape: reduce CPU usage for common case when calculating scrape_series_added metric

Also reduce CPU usage when applying `series_limit` to scrape targets with constant set of metrics.

The main idea is to perform the calculations on scrape_series_added and series_limit
only if the set of metrics exposed by the target has been changed.
Scrape targets rarely change the set of exposed metrics,
so this optimization should reduce CPU usage in general case.
This commit is contained in:
Aliaksandr Valialkin 2021-09-12 12:49:19 +03:00
parent 674a6eee6c
commit f3e89754a9
5 changed files with 56 additions and 52 deletions

View file

@ -11,7 +11,9 @@ sort: 15
* FEATURE: vmagent: add ability to set `series_limit` option for a particular scrape target via `__series_limit__` label. This allows setting the limit on the number of time series on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) for details. * FEATURE: vmagent: add ability to set `series_limit` option for a particular scrape target via `__series_limit__` label. This allows setting the limit on the number of time series on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) for details.
* FEATURE: vmagent: add ability to set `stream_parse` option for a particular scrape target via `__stream_parse__` label. This allows managing the stream parsing mode on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for details. * FEATURE: vmagent: add ability to set `stream_parse` option for a particular scrape target via `__stream_parse__` label. This allows managing the stream parsing mode on a per-target basis. See [these docs](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode) for details.
* FEATURE: add new relabeling actions: `keep_metrics` and `drop_metrics`. This simplifies metrics filtering by metric names. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. * FEATURE: add new relabeling actions: `keep_metrics` and `drop_metrics`. This simplifies metrics filtering by metric names. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details.
* FAETURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details. * FEATURE: allow splitting long `regex` in relabeling filters into an array of shorter regexps, which can be put into multiple lines for better readability and maintainability. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabeling) for more details.
* FEATURE: vmagent: reduce CPU usage when calculating the number of newly added series per scrape (this number is sent to remote storage in `scrape_series_added` metric).
* FEATURE: vmagent: reduce CPU usage when applying `series_limit` to scrape targets with constant set of metrics. See more information about `series_limit` [here](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter).
* BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601). * BUGFIX: properly handle queries with multiple filters matching empty labels such as `metric{label1=~"foo|",label2="bar|"}`. This filter must match the following series: `metric`, `metric{label1="foo"}`, `metric{label2="bar"}` and `metric{label1="foo",label2="bar"}`. Previously it was matching only `metric{label1="foo",label2="bar"}`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1601).
* BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm . * BUGFIX: vmselect: reset connection timeouts after each request to `vmstorage`. This should prevent from `cannot read data in 0.000 seconds: unexpected EOF` warning in logs. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1562). Thanks to @mxlxm .

View file

@ -6,6 +6,7 @@ import (
"math" "math"
"math/bits" "math/bits"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -178,9 +179,10 @@ type scrapeWork struct {
tmpRow parser.Row tmpRow parser.Row
// the seriesMap, seriesAdded and labelsHashBuf are used for fast calculation of `scrape_series_added` metric. // This flag is set to true if series_limit is exceeded.
seriesMap map[uint64]struct{} seriesLimitExceeded bool
seriesAdded int
// labelsHashBuf is used for calculating the hash on series labels
labelsHashBuf []byte labelsHashBuf []byte
// Optional limiter on the number of unique series per scrape target. // Optional limiter on the number of unique series per scrape target.
@ -195,7 +197,6 @@ type scrapeWork struct {
prevLabelsLen int prevLabelsLen int
// lastScrape holds the last response from scrape target. // lastScrape holds the last response from scrape target.
// It is used for generating Prometheus stale markers.
lastScrape []byte lastScrape []byte
} }
@ -307,6 +308,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
up := 1 up := 1
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
bodyString := bytesutil.ToUnsafeString(body.B) bodyString := bytesutil.ToUnsafeString(body.B)
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
areIdenticalSeries := parser.AreIdenticalSeriesFast(lastScrape, bodyString)
if err != nil { if err != nil {
up = 0 up = 0
scrapesFailed.Inc() scrapesFailed.Inc()
@ -327,8 +330,21 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
err = fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ err = fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
} }
sw.updateSeriesAdded(wc) if up == 0 {
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling) bodyString = ""
}
seriesAdded := 0
if !areIdenticalSeries {
// The returned value for seriesAdded may be bigger than the real number of added series
// if some series were removed during relabeling.
// This is a trade-off between performance and accuracy.
seriesAdded = sw.getSeriesAdded(bodyString)
}
if sw.seriesLimitExceeded || !areIdenticalSeries {
if sw.applySeriesLimit(wc) {
sw.seriesLimitExceeded = true
}
}
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
@ -340,12 +356,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
// body must be released only after wc is released, since wc refers to body. // body must be released only after wc is released, since wc refers to body.
sw.prevBodyLen = len(body.B) sw.prevBodyLen = len(body.B)
if !areIdenticalSeries {
sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
}
sw.lastScrape = append(sw.lastScrape[:0], bodyString...)
leveledbytebufferpool.Put(body) leveledbytebufferpool.Put(body)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
if up == 0 {
bodyString = ""
}
sw.sendStaleSeries(bodyString, scrapeTimestamp, false)
return err return err
} }
@ -383,7 +399,6 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+ return fmt.Errorf("the response from %q exceeds sample_limit=%d; "+
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
} }
sw.updateSeriesAdded(wc)
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
wc.resetNoRows() wc.resetNoRows()
return nil return nil
@ -404,12 +419,13 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
} }
scrapesFailed.Inc() scrapesFailed.Inc()
} }
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling)
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) // scrape_series_added isn't calculated in streaming mode,
// since it may need unlimited amounts of memory when scraping targets with millions of exposed metrics.
sw.addAutoTimeseries(wc, "scrape_series_added", 0, scrapeTimestamp)
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
sw.prevLabelsLen = len(wc.labels) sw.prevLabelsLen = len(wc.labels)
wc.reset() wc.reset()
@ -486,11 +502,16 @@ func (wc *writeRequestCtx) resetNoRows() {
var writeRequestCtxPool leveledWriteRequestCtxPool var writeRequestCtxPool leveledWriteRequestCtxPool
func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { func (sw *scrapeWork) getSeriesAdded(currScrape string) int {
if sw.seriesMap == nil { if currScrape == "" {
sw.seriesMap = make(map[uint64]struct{}, len(wc.writeRequest.Timeseries)) return 0
} }
m := sw.seriesMap lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
bodyString := parser.GetRowsDiff(currScrape, lastScrape)
return strings.Count(bodyString, "\n")
}
func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) bool {
seriesLimit := *seriesLimitPerTarget seriesLimit := *seriesLimitPerTarget
if sw.Config.SeriesLimit > 0 { if sw.Config.SeriesLimit > 0 {
seriesLimit = sw.Config.SeriesLimit seriesLimit = sw.Config.SeriesLimit
@ -499,24 +520,26 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) {
sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour) sw.seriesLimiter = bloomfilter.NewLimiter(seriesLimit, 24*time.Hour)
} }
hsl := sw.seriesLimiter hsl := sw.seriesLimiter
if hsl == nil {
return false
}
dstSeries := wc.writeRequest.Timeseries[:0] dstSeries := wc.writeRequest.Timeseries[:0]
job := sw.Config.Job() job := sw.Config.Job()
limitExceeded := false
for _, ts := range wc.writeRequest.Timeseries { for _, ts := range wc.writeRequest.Timeseries {
h := sw.getLabelsHash(ts.Labels) h := sw.getLabelsHash(ts.Labels)
if hsl != nil && !hsl.Add(h) { if !hsl.Add(h) {
// The limit on the number of hourly unique series per scrape target has been exceeded. // The limit on the number of hourly unique series per scrape target has been exceeded.
// Drop the metric. // Drop the metric.
metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{scrape_job_original=%q,scrape_job=%q,scrape_target=%q}`, metrics.GetOrCreateCounter(fmt.Sprintf(`promscrape_series_limit_rows_dropped_total{scrape_job_original=%q,scrape_job=%q,scrape_target=%q}`,
sw.Config.jobNameOriginal, job, sw.Config.ScrapeURL)).Inc() sw.Config.jobNameOriginal, job, sw.Config.ScrapeURL)).Inc()
limitExceeded = true
continue continue
} }
dstSeries = append(dstSeries, ts) dstSeries = append(dstSeries, ts)
if _, ok := m[h]; !ok {
m[h] = struct{}{}
sw.seriesAdded++
}
} }
wc.writeRequest.Timeseries = dstSeries wc.writeRequest.Timeseries = dstSeries
return limitExceeded
} }
func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAutoSeries bool) { func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAutoSeries bool) {
@ -524,18 +547,11 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut
return return
} }
lastScrape := bytesutil.ToUnsafeString(sw.lastScrape) lastScrape := bytesutil.ToUnsafeString(sw.lastScrape)
if parser.AreIdenticalSeriesFast(lastScrape, currScrape) {
// Fast path: the current scrape contains the same set of series as the previous scrape.
return
}
// Slow path: the current scrape contains different set of series than the previous scrape.
// Detect missing series in the current scrape and send stale markers for them.
bodyString := lastScrape bodyString := lastScrape
if currScrape != "" { if currScrape != "" {
bodyString = parser.GetDiffWithStaleRows(lastScrape, currScrape) bodyString = parser.GetRowsDiff(lastScrape, currScrape)
} }
wc := writeRequestCtxPool.Get(sw.prevLabelsLen) wc := &writeRequestCtx{}
defer writeRequestCtxPool.Put(wc)
if bodyString != "" { if bodyString != "" {
wc.rows.Unmarshal(bodyString) wc.rows.Unmarshal(bodyString)
srcRows := wc.rows.Rows srcRows := wc.rows.Rows
@ -562,17 +578,6 @@ func (sw *scrapeWork) sendStaleSeries(currScrape string, timestamp int64, addAut
} }
} }
sw.pushData(&wc.writeRequest) sw.pushData(&wc.writeRequest)
sw.lastScrape = append(sw.lastScrape[:0], currScrape...)
}
func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {
seriesAdded := sw.seriesAdded
sw.seriesAdded = 0
if len(sw.seriesMap) > 4*lastScrapeSize {
// Reset seriesMap, since it occupies more than 4x metrics collected during the last scrape.
sw.seriesMap = make(map[uint64]struct{}, lastScrapeSize)
}
return seriesAdded
} }
func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 { func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 {

View file

@ -325,7 +325,7 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
scrape_samples_scraped{job="xx",instance="foo.com"} 4 123 scrape_samples_scraped{job="xx",instance="foo.com"} 4 123
scrape_duration_seconds{job="xx",instance="foo.com"} 0 123 scrape_duration_seconds{job="xx",instance="foo.com"} 0 123
scrape_samples_post_metric_relabeling{job="xx",instance="foo.com"} 1 123 scrape_samples_post_metric_relabeling{job="xx",instance="foo.com"} 1 123
scrape_series_added{job="xx",instance="foo.com"} 1 123 scrape_series_added{job="xx",instance="foo.com"} 4 123
`) `)
f(` f(`
foo{bar="baz"} 34.44 foo{bar="baz"} 34.44

View file

@ -366,10 +366,10 @@ func prevBackslashesCount(s string) int {
return n return n
} }
// GetDiffWithStaleRows returns rows from s1, which are missing in s2. // GetRowsDiff returns rows from s1, which are missing in s2.
// //
// The returned rows have default value 0 and have no timestamps. // The returned rows have default value 0 and have no timestamps.
func GetDiffWithStaleRows(s1, s2 string) string { func GetRowsDiff(s1, s2 string) string {
var r1, r2 Rows var r1, r2 Rows
r1.Unmarshal(s1) r1.Unmarshal(s1)
r2.Unmarshal(s2) r2.Unmarshal(s2)
@ -386,11 +386,8 @@ func GetDiffWithStaleRows(s1, s2 string) string {
r := &rows1[i] r := &rows1[i]
key := marshalMetricNameWithTags(r) key := marshalMetricNameWithTags(r)
if !m[key] { if !m[key] {
logger.Infof("missing %s", key)
diff = append(diff, key...) diff = append(diff, key...)
diff = append(diff, " 0\n"...) diff = append(diff, " 0\n"...)
} else {
logger.Infof("found %s", key)
} }
} }
return string(diff) return string(diff)

View file

@ -6,12 +6,12 @@ import (
"testing" "testing"
) )
func TestGetDiffWithStaleRows(t *testing.T) { func TestGetRowsDiff(t *testing.T) {
f := func(s1, s2, resultExpected string) { f := func(s1, s2, resultExpected string) {
t.Helper() t.Helper()
result := GetDiffWithStaleRows(s1, s2) result := GetRowsDiff(s1, s2)
if result != resultExpected { if result != resultExpected {
t.Fatalf("unexpected result for GetDiffWithStaleRows(%q, %q); got %q; want %q", s1, s2, result, resultExpected) t.Fatalf("unexpected result for GetRowsDiff(%q, %q); got %q; want %q", s1, s2, result, resultExpected)
} }
} }
f("", "", "") f("", "", "")