lib/promscrape: limit number of sent stale series at once (#3686)

Stale series are sent when there is a difference between current
and previous scrapes. Those series which disappeared in the current scrape
are marked as stale and sent to the remote storage.

Sending stale series requires memory allocation and in case when too many
series disappear in the same it could result in noticeable memory spike.
For example, re-deploy of a big fleet of service can result into
excessive memory usage for vmagent, because all the series with old
pod name will be marked as stale and sent to the remote write storage.

This change limits the number of stale series which can be sent at once,
so memory usage remains steady.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3675
Signed-off-by: hagen1778 <roman@victoriametrics.com>

Signed-off-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Roman Khavronenko 2023-01-24 06:15:59 +01:00 committed by GitHub
parent 2c4e384f07
commit 393876e52a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 78 additions and 24 deletions

View file

@ -767,6 +767,11 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int {
var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs())
// maxStaleSeriesAtOnce defines the max number of stale series
// to process and send at once. It prevents from excessive memory usage
// when big number of metrics become stale at the same time.
const maxStaleSeriesAtOnce = 1e3
func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
@ -790,35 +795,46 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i
}()
if bodyString != "" {
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
srcRows := wc.rows.Rows
for i := range srcRows {
}
srcRows := wc.rows.Rows
for from := 0; from < len(srcRows); from += maxStaleSeriesAtOnce {
to := from + maxStaleSeriesAtOnce
if to > len(srcRows) {
to = len(srcRows)
}
for i := range srcRows[from:to] {
sw.addRowToTimeseries(wc, &srcRows[i], timestamp, true)
}
}
if addAutoSeries {
am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp)
}
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw.seriesLimitExceeded {
sw.applySeriesLimit(wc)
}
series := wc.writeRequest.Timeseries
if len(series) == 0 {
return
}
// Substitute all the values with Prometheus stale markers.
for _, tss := range series {
samples := tss.Samples
for i := range samples {
samples[i].Value = decimal.StaleNaN
// add auto series at the last iteration
if addAutoSeries && to == len(srcRows) {
am := &autoMetrics{}
sw.addAutoMetrics(am, wc, timestamp)
}
staleSamplesCreated.Add(len(samples))
// Apply series limit to stale markers in order to prevent sending stale markers for newly created series.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660
if sw.seriesLimitExceeded {
sw.applySeriesLimit(wc)
}
series := wc.writeRequest.Timeseries
if len(series) == 0 {
continue
}
// Substitute all the values with Prometheus stale markers.
for _, tss := range series {
samples := tss.Samples
for i := range samples {
samples[i].Value = decimal.StaleNaN
}
staleSamplesCreated.Add(len(samples))
}
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
wc.reset()
}
sw.pushData(sw.Config.AuthToken, &wc.writeRequest)
}
var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`)

View file

@ -686,6 +686,44 @@ func TestAddRowToTimeseriesNoRelabeling(t *testing.T) {
`metric{a="e",foo="bar"} 0 123`)
}
func TestSendStaleSeries(t *testing.T) {
var sw scrapeWork
sw.Config = &ScrapeWork{
NoStaleMarkers: false,
}
var timeseriesExpectedN int
sw.PushData = func(at *auth.Token, wr *prompbmarshal.WriteRequest) {
t.Helper()
if len(wr.Timeseries) != timeseriesExpectedN {
t.Fatalf("expected to get %d stale series; got %d", timeseriesExpectedN, len(wr.Timeseries))
}
}
generateScrape := func(n int) string {
w := strings.Builder{}
for i := 0; i < n; i++ {
w.WriteString(fmt.Sprintf("foo_%d 1\n", i))
}
return w.String()
}
timeseriesExpectedN = 0
sw.sendStaleSeries("", "", 0, false)
timeseriesExpectedN = 0
sw.sendStaleSeries(generateScrape(10), generateScrape(10), 0, false)
timeseriesExpectedN = 10
sw.sendStaleSeries(generateScrape(10), "", 0, false)
timeseriesExpectedN = 5
sw.sendStaleSeries(generateScrape(10), generateScrape(5), 0, false)
timeseriesExpectedN = maxStaleSeriesAtOnce
sw.sendStaleSeries(generateScrape(maxStaleSeriesAtOnce*2), "", 0, false)
}
func parsePromRow(data string) *parser.Row {
var rows parser.Rows
errLogger := func(s string) {