From 95ce89e7d70c1b5910546b5daad9dc2469580059 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Wed, 2 Sep 2020 18:44:43 +0300 Subject: [PATCH] lib/promscrape: use the number of parsed rows as a basis for writeRequestCtxPool leveling The previous basis on `cap(sw.labels)` doesn't work anymore after 7785869ccc4d4ff4dc80ad9f33442f6c5b369c72 , because `sw.labels` may be reset multiple times when processing big number of rows. --- lib/promscrape/scrapework.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 06208c3042..b3d10ed1b6 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -138,9 +138,9 @@ type scrapeWork struct { // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int - // prevLabelsLen contains the number of all the labels generated during the previous scrape. + // prevRowsLen contains the number rows scraped during the previous scrape. // It is used as a hint in order to reduce memory usage when parsing scrape responses. - prevLabelsLen int + prevRowsLen int } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -217,7 +217,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(len(body.B))) up := 1 - wc := writeRequestCtxPool.Get(sw.prevLabelsLen) + wc := writeRequestCtxPool.Get(sw.prevRowsLen) if err != nil { up = 0 scrapesFailed.Inc() @@ -236,7 +236,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error samplesPostRelabeling := 0 for i := range srcRows { sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) - if len(wc.labels) > 10000 { + if len(wc.labels) > 40000 { // Limit the maximum size of wc.writeRequest. // This should reduce memory usage when scraping targets with millions of metrics and/or labels. // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ @@ -257,7 +257,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - sw.prevLabelsLen = len(wc.labels) + sw.prevRowsLen = samplesScraped wc.reset() writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. @@ -275,8 +275,8 @@ type leveledWriteRequestCtxPool struct { pools [30]sync.Pool } -func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx { - id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity) +func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { + id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity) for i := 0; i < 2; i++ { if id < 0 || id >= len(lwp.pools) { break @@ -292,7 +292,7 @@ func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx } func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { - capacity := cap(wc.labels) + capacity := cap(wc.rows.Rows) id, _ := lwp.getPoolIDAndCapacity(capacity) wc.reset() lwp.pools[id].Put(wc)