lib/promscrape: an attempt to reduce memory usage when vmagent scrapes targets with varying number of metrics

Do not cache too big byte buffers and too big writeRequestCtx objects,
since it is cheaper to re-create them instead of wasting RAM for their caching.

This reverts 7f6f350ee1
This commit is contained in:
Aliaksandr Valialkin 2021-03-14 22:56:23 +02:00
parent 7f6f350ee1
commit c14dafce43
2 changed files with 26 additions and 32 deletions

View file

@ -15,7 +15,9 @@ import (
// ... // ...
// pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3) // pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3)
// //
var pools [30]sync.Pool // Limit the maximum capacity to 2^18, since there are no performance benefits
// in caching byte slices with bigger capacities.
var pools [17]sync.Pool
// Get returns byte buffer with the given capacity. // Get returns byte buffer with the given capacity.
func Get(capacity int) *bytesutil.ByteBuffer { func Get(capacity int) *bytesutil.ByteBuffer {
@ -37,9 +39,11 @@ func Get(capacity int) *bytesutil.ByteBuffer {
// Put returns bb to the pool. // Put returns bb to the pool.
func Put(bb *bytesutil.ByteBuffer) { func Put(bb *bytesutil.ByteBuffer) {
capacity := cap(bb.B) capacity := cap(bb.B)
id, _ := getPoolIDAndCapacity(capacity) id, poolCapacity := getPoolIDAndCapacity(capacity)
bb.Reset() if capacity <= poolCapacity {
pools[id].Put(bb) bb.Reset()
pools[id].Put(bb)
}
} }
func getPoolIDAndCapacity(size int) (int, int) { func getPoolIDAndCapacity(size int) (int, int) {
@ -49,7 +53,7 @@ func getPoolIDAndCapacity(size int) (int, int) {
} }
size >>= 3 size >>= 3
id := bits.Len(uint(size)) id := bits.Len(uint(size))
if id > len(pools) { if id >= len(pools) {
id = len(pools) - 1 id = len(pools) - 1
} }
return id, (1 << (id + 3)) return id, (1 << (id + 3))

View file

@ -177,9 +177,9 @@ type scrapeWork struct {
// It is used as a hint in order to reduce memory usage for body buffers. // It is used as a hint in order to reduce memory usage for body buffers.
prevBodyLen int prevBodyLen int
// prevRowsLen contains the number rows scraped during the previous scrape. // prevLabelsLen contains the number labels scraped during the previous scrape.
// It is used as a hint in order to reduce memory usage when parsing scrape responses. // It is used as a hint in order to reduce memory usage when parsing scrape responses.
prevRowsLen int prevLabelsLen int
} }
func (sw *scrapeWork) run(stopCh <-chan struct{}) { func (sw *scrapeWork) run(stopCh <-chan struct{}) {
@ -283,7 +283,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeDuration.Update(duration) scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(len(body.B))) scrapeResponseSize.Update(float64(len(body.B)))
up := 1 up := 1
wc := writeRequestCtxPool.Get(sw.prevRowsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
if err != nil { if err != nil {
up = 0 up = 0
scrapesFailed.Inc() scrapesFailed.Inc()
@ -294,22 +294,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
srcRows := wc.rows.Rows srcRows := wc.rows.Rows
samplesScraped := len(srcRows) samplesScraped := len(srcRows)
scrapedSamples.Update(float64(samplesScraped)) scrapedSamples.Update(float64(samplesScraped))
samplesPostRelabeling := 0
for i := range srcRows { for i := range srcRows {
sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true)
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
}
} }
samplesPostRelabeling += len(wc.writeRequest.Timeseries) samplesPostRelabeling := len(wc.writeRequest.Timeseries)
if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit { if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit {
wc.resetNoRows() wc.resetNoRows()
up = 0 up = 0
@ -325,7 +313,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
startTime := time.Now() startTime := time.Now()
sw.PushData(&wc.writeRequest) sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime) pushDataDuration.UpdateDuration(startTime)
sw.prevRowsLen = samplesScraped sw.prevLabelsLen = len(wc.labels)
wc.reset() wc.reset()
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
// body must be released only after wc is released, since wc refers to body. // body must be released only after wc is released, since wc refers to body.
@ -339,7 +327,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped := 0 samplesScraped := 0
samplesPostRelabeling := 0 samplesPostRelabeling := 0
responseSize := int64(0) responseSize := int64(0)
wc := writeRequestCtxPool.Get(sw.prevRowsLen) wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
sr, err := sw.GetStreamReader() sr, err := sw.GetStreamReader()
if err != nil { if err != nil {
@ -389,7 +377,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
startTime := time.Now() startTime := time.Now()
sw.PushData(&wc.writeRequest) sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime) pushDataDuration.UpdateDuration(startTime)
sw.prevRowsLen = len(wc.rows.Rows) sw.prevLabelsLen = len(wc.labels)
wc.reset() wc.reset()
writeRequestCtxPool.Put(wc) writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
@ -401,11 +389,11 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
// //
// Its logic has been copied from leveledbytebufferpool. // Its logic has been copied from leveledbytebufferpool.
type leveledWriteRequestCtxPool struct { type leveledWriteRequestCtxPool struct {
pools [30]sync.Pool pools [13]sync.Pool
} }
func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx { func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx {
id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity) id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity)
for i := 0; i < 2; i++ { for i := 0; i < 2; i++ {
if id < 0 || id >= len(lwp.pools) { if id < 0 || id >= len(lwp.pools) {
break break
@ -421,10 +409,12 @@ func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx {
} }
func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) { func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) {
capacity := cap(wc.rows.Rows) capacity := cap(wc.labels)
id, _ := lwp.getPoolIDAndCapacity(capacity) id, poolCapacity := lwp.getPoolIDAndCapacity(capacity)
wc.reset() if capacity <= poolCapacity {
lwp.pools[id].Put(wc) wc.reset()
lwp.pools[id].Put(wc)
}
} }
func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) { func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) {
@ -434,7 +424,7 @@ func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int)
} }
size >>= 3 size >>= 3
id := bits.Len(uint(size)) id := bits.Len(uint(size))
if id > len(lwp.pools) { if id >= len(lwp.pools) {
id = len(lwp.pools) - 1 id = len(lwp.pools) - 1
} }
return id, (1 << (id + 3)) return id, (1 << (id + 3))