lib/promscrape: use the number of parsed rows as a basis for writeRequestCtxPool leveling

The previous basis on `cap(sw.labels)` doesn't work anymore after 7785869ccc ,
because `sw.labels` may be reset multiple times when processing big number of rows.
This commit is contained in:
Aliaksandr Valialkin 2020-09-02 18:44:43 +03:00
parent ed899ca9e8
commit 038358b777

View file

@ -138,9 +138,9 @@ type scrapeWork struct {
// It is used as a hint in order to reduce memory usage for body buffers.
prevBodyLen int
// prevLabelsLen contains the number of all the labels generated during the previous scrape.
// prevRowsLen contains the number rows scraped during the previous scrape.
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
prevLabelsLen int
prevRowsLen int
}
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
@ -217,7 +217,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(len(body.B)))
up := 1
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
if err != nil {
up = 0
scrapesFailed.Inc()
@ -236,7 +236,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
samplesPostRelabeling := 0
for i := range srcRows {
sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true)
if len(wc.labels) > 10000 {
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
@ -257,7 +257,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevLabelsLen = len(wc.labels)
sw.prevRowsLen = samplesScraped
wc.reset()
writeRequestCtxPool.Put(wc)
// body must be released only after wc is released, since wc refers to body.
@ -275,8 +275,8 @@ type leveledWriteRequestCtxPool struct {
pools [30]sync.Pool
}
func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx {
id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity)
func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx {
id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity)
for i := 0; i < 2; i++ {
if id < 0 || id >= len(lwp.pools) {
break
@ -292,7 +292,7 @@ func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx
}
func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) {
capacity := cap(wc.labels)
capacity := cap(wc.rows.Rows)
id, _ := lwp.getPoolIDAndCapacity(capacity)
wc.reset()
lwp.pools[id].Put(wc)