From d79f1b106c2fd60d8760419fc4acb57d39cfd1c2 Mon Sep 17 00:00:00 2001 From: lzfhust Date: Wed, 18 Jan 2023 14:57:56 +0800 Subject: [PATCH 1/4] using writeRequestCtxPool when delete kubernetes clusters from kubernetes_sd_configs (#3669) --- lib/promscrape/scrapework.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 2370869db6..095138b0b3 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -773,7 +773,7 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i if currScrape != "" { bodyString = parser.GetRowsDiff(lastScrape, currScrape) } - wc := &writeRequestCtx{} + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) if bodyString != "" { wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) srcRows := wc.rows.Rows @@ -805,6 +805,8 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i staleSamplesCreated.Add(len(samples)) } sw.pushData(sw.Config.AuthToken, &wc.writeRequest) + wc.reset() + writeRequestCtxPool.Put(wc) } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) From 68463c9e87303a587a9af1d054aa32c9efeae31a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 17 Jan 2023 23:09:34 -0800 Subject: [PATCH 2/4] lib/promscrape: follow-up for d79f1b106c2fd60d8760419fc4acb57d39cfd1c2 - Document the fix at docs/CHANGELOG.md - Limit the concurrency for sendStaleMarkers() function in order to limit its memory usage when big number of targets disappear and staleness markers are sent for all the metrics exposed by these targets. - Make sure that the writeRequestCtx is returned to the pool when there is no need to send staleness markers. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 --- docs/CHANGELOG.md | 1 + lib/promscrape/scrapework.go | 24 ++++++++++++++++++------ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f6919a69da..a71e0fb0c5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,7 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): consistently put the scrape url with scrape target labels to all error logs for failed scrapes. Previously some failed scrapes were logged without this information. * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not send stale markers to remote storage for series exceeding the configured [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3660). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [series limit](https://docs.victoriametrics.com/vmagent.html#cardinality-limiter) when [staleness tracking](https://docs.victoriametrics.com/vmagent.html#prometheus-staleness-markers) is disabled. +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage spikes when big number of scrape targets disappear at once. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668). Thanks to @lzfhust for [the initial fix](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3669). * BUGFIX: [Pushgateway import](https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format): properly return `200 OK` HTTP response code. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3636). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly parse `M` and `Mi` suffixes as `1e6` multipliers in `1M` and `1Mi` numeric constants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3664). The issue has been introduced in [v1.86.0](https://docs.victoriametrics.com/CHANGELOG.html#v1860). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): properly display range query results at `Table` view. For example, `up[5m]` query now shows all the raw samples for the last 5 minutes for the `up` metric at the `Table` view. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3516). diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 095138b0b3..18ddb39052 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -431,15 +431,15 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } -var concurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) +var processScrapedDataConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) { // This function is CPU-bound, while it may allocate big amounts of memory. // That's why it is a good idea to limit the number of concurrent calls to this function // in order to limit memory usage under high load without sacrificing the performance. - concurrencyLimitCh <- struct{}{} + processScrapedDataConcurrencyLimitCh <- struct{}{} defer func() { - <-concurrencyLimitCh + <-processScrapedDataConcurrencyLimitCh }() endTimestamp := time.Now().UnixNano() / 1e6 @@ -765,7 +765,17 @@ func (sw *scrapeWork) applySeriesLimit(wc *writeRequestCtx) int { return samplesDropped } +var sendStaleSeriesConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs()) + func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp int64, addAutoSeries bool) { + // This function is CPU-bound, while it may allocate big amounts of memory. + // That's why it is a good idea to limit the number of concurrent calls to this function + // in order to limit memory usage under high load without sacrificing the performance. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3668 + sendStaleSeriesConcurrencyLimitCh <- struct{}{} + defer func() { + <-sendStaleSeriesConcurrencyLimitCh + }() if sw.Config.NoStaleMarkers { return } @@ -773,7 +783,11 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i if currScrape != "" { bodyString = parser.GetRowsDiff(lastScrape, currScrape) } - wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + defer func() { + wc.reset() + writeRequestCtxPool.Put(wc) + }() if bodyString != "" { wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) srcRows := wc.rows.Rows @@ -805,8 +819,6 @@ func (sw *scrapeWork) sendStaleSeries(lastScrape, currScrape string, timestamp i staleSamplesCreated.Add(len(samples)) } sw.pushData(sw.Config.AuthToken, &wc.writeRequest) - wc.reset() - writeRequestCtxPool.Put(wc) } var staleSamplesCreated = metrics.NewCounter(`vm_promscrape_stale_samples_created_total`) From 0c625185cb725d9ba141900749810435e9a92816 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 17 Jan 2023 23:25:42 -0800 Subject: [PATCH 3/4] app/vmselect/promql: updates tests for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3664 --- app/vmselect/promql/exec_test.go | 57 +++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index 56f79078be..fca14d550f 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -86,6 +86,61 @@ func TestExecSuccess(t *testing.T) { resultExpected := []netstorage.Result{r} f(q, resultExpected) }) + t.Run("duration-constant", func(t *testing.T) { + t.Parallel() + q := `1h23m5S` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{4985, 4985, 4985, 4985, 4985, 4985}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("num-with-suffix-1", func(t *testing.T) { + t.Parallel() + q := `123M` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{123e6, 123e6, 123e6, 123e6, 123e6, 123e6}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("num-with-suffix-2", func(t *testing.T) { + t.Parallel() + q := `1.23TB` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1.23e12, 1.23e12, 1.23e12, 1.23e12, 1.23e12, 1.23e12}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("num-with-suffix-3", func(t *testing.T) { + t.Parallel() + q := `1.23Mib` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20)}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) + t.Run("num-with-suffix-4", func(t *testing.T) { + t.Parallel() + q := `1.23mib` + r := netstorage.Result{ + MetricName: metricNameExpected, + Values: []float64{1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20), 1.23 * (1 << 20)}, + Timestamps: timestampsExpected, + } + resultExpected := []netstorage.Result{r} + f(q, resultExpected) + }) t.Run("simple-arithmetic", func(t *testing.T) { t.Parallel() q := `-1+2 *3 ^ 4+5%6` @@ -7357,7 +7412,7 @@ func TestExecSuccess(t *testing.T) { }) t.Run(`rollup_scrape_interval()`, func(t *testing.T) { t.Parallel() - q := `sort_by_label(rollup_scrape_interval(1[5M:10s]), "rollup")` + q := `sort_by_label(rollup_scrape_interval(1[5m:10S]), "rollup")` r1 := netstorage.Result{ MetricName: metricNameExpected, Values: []float64{10, 10, 10, 10, 10, 10}, From 1ac025bbc936861a07902c256cdc07c522591ce3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 18 Jan 2023 00:01:03 -0800 Subject: [PATCH 4/4] lib/storage: use better naming for a function returning new []rawRows - newRawRowsBlock() -> newRawRows() --- lib/storage/partition.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 8fcc8bc132..65616ee6b6 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -497,7 +497,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { rrs.mu.Lock() if cap(rrs.rows) == 0 { - rrs.rows = newRawRowsBlock() + rrs.rows = newRawRows() } n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows) rrs.rows = rrs.rows[:len(rrs.rows)+n] @@ -524,7 +524,7 @@ type rawRowsBlock struct { rows []rawRow } -func newRawRowsBlock() []rawRow { +func newRawRows() []rawRow { n := getMaxRawRowsPerShard() return make([]rawRow, 0, n) } @@ -533,7 +533,7 @@ func getRawRowsBlock() *rawRowsBlock { v := rawRowsBlockPool.Get() if v == nil { return &rawRowsBlock{ - rows: newRawRowsBlock(), + rows: newRawRows(), } } return v.(*rawRowsBlock)