diff --git a/lib/leveledbytebufferpool/pool.go b/lib/leveledbytebufferpool/pool.go index 59f660211..96ba6a2d1 100644 --- a/lib/leveledbytebufferpool/pool.go +++ b/lib/leveledbytebufferpool/pool.go @@ -15,7 +15,9 @@ import ( // ... // pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3) // -var pools [30]sync.Pool +// Limit the maximum capacity to 2^18, since there are no performance benefits +// in caching byte slices with bigger capacities. +var pools [17]sync.Pool // Get returns byte buffer with the given capacity. func Get(capacity int) *bytesutil.ByteBuffer { @@ -37,9 +39,11 @@ func Get(capacity int) *bytesutil.ByteBuffer { // Put returns bb to the pool. func Put(bb *bytesutil.ByteBuffer) { capacity := cap(bb.B) - id, _ := getPoolIDAndCapacity(capacity) - bb.Reset() - pools[id].Put(bb) + id, poolCapacity := getPoolIDAndCapacity(capacity) + if capacity <= poolCapacity { + bb.Reset() + pools[id].Put(bb) + } } func getPoolIDAndCapacity(size int) (int, int) { @@ -49,7 +53,7 @@ func getPoolIDAndCapacity(size int) (int, int) { } size >>= 3 id := bits.Len(uint(size)) - if id > len(pools) { + if id >= len(pools) { id = len(pools) - 1 } return id, (1 << (id + 3)) diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index e6b537b68..b9da864d5 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -177,9 +177,9 @@ type scrapeWork struct { // It is used as a hint in order to reduce memory usage for body buffers. prevBodyLen int - // prevRowsLen contains the number rows scraped during the previous scrape. + // prevLabelsLen contains the number labels scraped during the previous scrape. // It is used as a hint in order to reduce memory usage when parsing scrape responses. - prevRowsLen int + prevLabelsLen int } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -283,7 +283,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error scrapeDuration.Update(duration) scrapeResponseSize.Update(float64(len(body.B))) up := 1 - wc := writeRequestCtxPool.Get(sw.prevRowsLen) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) if err != nil { up = 0 scrapesFailed.Inc() @@ -294,22 +294,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error srcRows := wc.rows.Rows samplesScraped := len(srcRows) scrapedSamples.Update(float64(samplesScraped)) - samplesPostRelabeling := 0 for i := range srcRows { sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) - if len(wc.labels) > 40000 { - // Limit the maximum size of wc.writeRequest. - // This should reduce memory usage when scraping targets with millions of metrics and/or labels. - // For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/ - samplesPostRelabeling += len(wc.writeRequest.Timeseries) - sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) - wc.resetNoRows() - } } - samplesPostRelabeling += len(wc.writeRequest.Timeseries) + samplesPostRelabeling := len(wc.writeRequest.Timeseries) if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { wc.resetNoRows() up = 0 @@ -325,7 +313,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - sw.prevRowsLen = samplesScraped + sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) // body must be released only after wc is released, since wc refers to body. @@ -339,7 +327,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 responseSize := int64(0) - wc := writeRequestCtxPool.Get(sw.prevRowsLen) + wc := writeRequestCtxPool.Get(sw.prevLabelsLen) sr, err := sw.GetStreamReader() if err != nil { @@ -389,7 +377,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { startTime := time.Now() sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - sw.prevRowsLen = len(wc.rows.Rows) + sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) @@ -401,11 +389,11 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { // // Its logic has been copied from leveledbytebufferpool. type leveledWriteRequestCtxPool struct { - pools [30]sync.Pool + pools [13]sync.Pool } -func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { - id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity) +func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx { + id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity) for i := 0; i < 2; i++ { if id < 0 || id >= len(lwp.pools) { break @@ -421,10 +409,12 @@ func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { } func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { - capacity := cap(wc.rows.Rows) - id, _ := lwp.getPoolIDAndCapacity(capacity) - wc.reset() - lwp.pools[id].Put(wc) + capacity := cap(wc.labels) + id, poolCapacity := lwp.getPoolIDAndCapacity(capacity) + if capacity <= poolCapacity { + wc.reset() + lwp.pools[id].Put(wc) + } } func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) { @@ -434,7 +424,7 @@ func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) } size >>= 3 id := bits.Len(uint(size)) - if id > len(lwp.pools) { + if id >= len(lwp.pools) { id = len(lwp.pools) - 1 } return id, (1 << (id + 3))