lib/promscrape: limit the concurrency during parsing and relabeling the scraped samples

This should reduce memory usage when scraping big number of targets,
since this limits the summary memory usage during concurrent parsing and relabeling
by the number of available CPU cores.
This commit is contained in:
Aliaksandr Valialkin 2023-01-06 22:59:15 -08:00
parent 293e4dc77b
commit 986a05e18d
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 22 additions and 7 deletions

View file

@ -20,6 +20,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
**Update note 2:** The `vm_concurrent_addrows_current` and `vm_concurrent_addrows_capacity` metrics [exported](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#monitoring) by `vmstorage` are replaced with `vm_concurrent_insert_current` and `vm_concurrent_insert_capacity` metrics in order to be consistent with the corresponding metrics exported by `vminsert`. Please update queries in dahsboards and alerting rules with new metric names if old metric names are used there.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for aggregation of incoming [samples](https://docs.victoriametrics.com/keyConcepts.html#raw-samples) by time and by labels. See [these docs](https://docs.victoriametrics.com/stream-aggregation.html) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3460).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce memory usage when scraping big number of targets without the need to enable [stream parsing mode](https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): add support for Prometheus-compatible target discovery for [HashiCorp Nomad](https://www.nomadproject.io/) services via [nomad_sd_configs](https://docs.victoriametrics.com/sd_configs.html#nomad_sd_configs). See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3367). Thanks to @mr-karan for [the implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3549).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): automatically pre-fetch `metric_relabel_configs` and the target labels when clicking on the `debug metrics relabeling` link at the `http://vmagent:8429/targets` page at the particular target. See [these docs](https://docs.victoriametrics.com/vmagent.html#relabel-debug).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add ability to explore metrics exported by a particular `job` / `instance`. See [these docs](https://docs.victoriametrics.com/#metrics-explorer) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3386).

View file

@ -3,7 +3,6 @@ package promscrape
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"io"
"math"
"math/bits"
@ -11,8 +10,10 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
@ -416,6 +417,24 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
body := leveledbytebufferpool.Get(sw.prevBodyLen)
var err error
body.B, err = sw.ReadData(body.B[:0])
releaseBody, err := sw.processScrapedData(scrapeTimestamp, realTimestamp, body, err)
if releaseBody {
leveledbytebufferpool.Put(body)
}
return err
}
var concurrencyLimitCh = make(chan struct{}, 2*cgroup.AvailableCPUs())
func (sw *scrapeWork) processScrapedData(scrapeTimestamp, realTimestamp int64, body *bytesutil.ByteBuffer, err error) (bool, error) {
// This function is CPU-bound, while it may allocate big amounts of memory.
// That's why it is a good idea to limit the number of concurrent calls to this function
// in order to limit memory usage under high load without sacrificing the performance.
concurrencyLimitCh <- struct{}{}
defer func() {
<-concurrencyLimitCh
}()
endTimestamp := time.Now().UnixNano() / 1e6
duration := float64(endTimestamp-realTimestamp) / 1e3
scrapeDuration.Update(duration)
@ -489,13 +508,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
sw.storeLastScrape(body.B)
}
sw.finalizeLastScrape()
if !mustSwitchToStreamParse {
// Return body to the pool only if its size is smaller than -promscrape.minResponseSizeForStreamParse
// This should reduce memory usage when scraping targets which return big responses.
leveledbytebufferpool.Put(body)
}
tsmGlobal.Update(sw, up == 1, realTimestamp, int64(duration*1000), samplesScraped, err)
return err
return !mustSwitchToStreamParse, err
}
func (sw *scrapeWork) pushData(at *auth.Token, wr *prompbmarshal.WriteRequest) {