From c14dafce43be1f8811323a13186be3d7b5a1ab70 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 14 Mar 2021 22:56:23 +0200 Subject: [PATCH] lib/promscrape: an attempt to reduce memory usage when vmagent scrapes targets with varying number of metrics Do not cache too big byte buffers and too big writeRequestCtx objects, since it is cheaper to re-create them instead of wasting RAM for their caching. This reverts 7f6f350ee19e45002cac83bbfda22d2bf6309ae4 --- lib/leveledbytebufferpool/pool.go | 14 ++++++---- lib/promscrape/scrapework.go | 44 ++++++++++++------------------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/lib/leveledbytebufferpool/pool.go b/lib/leveledbytebufferpool/pool.go index 59f660211..96ba6a2d1 100644 --- a/lib/leveledbytebufferpool/pool.go +++ b/lib/leveledbytebufferpool/pool.go @@ -15,7 +15,9 @@ import ( // ... // pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3) // -var pools [30]sync.Pool +// Limit the maximum capacity to 2^18, since there are no performance benefits +// in caching byte slices with bigger capacities. +var pools [17]sync.Pool // Get returns byte buffer with the given capacity. func Get(capacity int) *bytesutil.ByteBuffer { @@ -37,9 +39,11 @@ func Get(capacity int) *bytesutil.ByteBuffer { // Put returns bb to the pool. func Put(bb *bytesutil.ByteBuffer) { capacity := cap(bb.B) - id, _ := getPoolIDAndCapacity(capacity) - bb.Reset() - pools[id].Put(bb) + id, poolCapacity := getPoolIDAndCapacity(capacity) + if capacity <= poolCapacity { + bb.Reset() + pools[id].Put(bb) + } } func getPoolIDAndCapacity(size int) (int, int) { @@ -49,7 +53,7 @@ func getPoolIDAndCapacity(size int) (int, int) { } size >>= 3 id := bits.Len(uint(size)) - if id > len(pools) { + if id >= len(pools) { id = len(pools) - 1 } return id, (1 << (id + 3)) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index e6b537b68..b9da864d5 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -177,9 +177,9 @@ type scrapeWork struct { // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int - // prevRowsLen contains the number rows scraped during the previous scrape. + // prevLabelsLen contains the number labels scraped during the previous scrape. // It is used as a hint in order to reduce memory usage when parsing scrape responses. - prevRowsLen int + prevLabelsLen int } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -283,7 +283,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(len(body.B))) up := 1 - wc := writeRequestCtxPool.Get(sw.prevRowsLen) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) if err != nil { up = 0 scrapesFailed.Inc() @@ -294,22 +294,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error srcRows := wc.rows.Rows samplesScraped := len(srcRows) scrapedSamples.Update(float64(samplesScraped)) - samplesPostRelabeling := 0 for i := range srcRows { sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) - if len(wc.labels) > 40000 { - // Limit the maximum size of wc.writeRequest. - // This should reduce memory usage when scraping targets with millions of metrics and/or labels. - // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) - wc.resetNoRows() - } } - samplesPostRelabeling += len(wc.writeRequest.Timeseries) + samplesPostRelabeling := len(wc.writeRequest.Timeseries) if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { wc.resetNoRows() up = 0 @@ -325,7 +313,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - sw.prevRowsLen = samplesScraped + sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. @@ -339,7 +327,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 responseSize := int64(0) - wc := writeRequestCtxPool.Get(sw.prevRowsLen) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) sr, err := sw.GetStreamReader() if err != nil { @@ -389,7 +377,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - sw.prevRowsLen = len(wc.rows.Rows) + sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) @@ -401,11 +389,11 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { // // Its logic has been copied from leveledbytebufferpool. type leveledWriteRequestCtxPool struct { - pools [30]sync.Pool + pools [13]sync.Pool } -func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { - id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity) +func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx { + id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity) for i := 0; i < 2; i++ { if id < 0 || id >= len(lwp.pools) { break @@ -421,10 +409,12 @@ func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { } func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { - capacity := cap(wc.rows.Rows) - id, _ := lwp.getPoolIDAndCapacity(capacity) - wc.reset() - lwp.pools[id].Put(wc) + capacity := cap(wc.labels) + id, poolCapacity := lwp.getPoolIDAndCapacity(capacity) + if capacity <= poolCapacity { + wc.reset() + lwp.pools[id].Put(wc) + } } func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) { @@ -434,7 +424,7 @@ func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) } size >>= 3 id := bits.Len(uint(size)) - if id > len(lwp.pools) { + if id >= len(lwp.pools) { id = len(lwp.pools) - 1 } return id, (1 << (id + 3))