lib/promscrape: an attempt to reduce memory usage when vmagent scrapes targets with varying number of metrics

Do not cache too big byte buffers and too big writeRequestCtx objects,
since it is cheaper to re-create them instead of wasting RAM for their caching.

This reverts 7f6f350ee1
This commit is contained in:
Aliaksandr Valialkin 2021-03-14 22:56:23 +02:00
parent df10d80257
commit 7f52aae20c
2 changed files with 26 additions and 32 deletions

View file

@ -15,7 +15,9 @@ import (
// ...
// pools[n] is for capacities from 2^(n+2)+1 to 2^(n+3)
//
var pools [30]sync.Pool
// Limit the maximum capacity to 2^18, since there are no performance benefits
// in caching byte slices with bigger capacities.
var pools [17]sync.Pool
// Get returns byte buffer with the given capacity.
func Get(capacity int) *bytesutil.ByteBuffer {
@ -37,9 +39,11 @@ func Get(capacity int) *bytesutil.ByteBuffer {
// Put returns bb to the pool.
func Put(bb *bytesutil.ByteBuffer) {
capacity := cap(bb.B)
id, _ := getPoolIDAndCapacity(capacity)
bb.Reset()
pools[id].Put(bb)
id, poolCapacity := getPoolIDAndCapacity(capacity)
if capacity <= poolCapacity {
bb.Reset()
pools[id].Put(bb)
}
}
func getPoolIDAndCapacity(size int) (int, int) {
@ -49,7 +53,7 @@ func getPoolIDAndCapacity(size int) (int, int) {
}
size >>= 3
id := bits.Len(uint(size))
if id > len(pools) {
if id >= len(pools) {
id = len(pools) - 1
}
return id, (1 << (id + 3))

View file

@ -177,9 +177,9 @@ type scrapeWork struct {
// It is used as a hint in order to reduce memory usage for body buffers.
prevBodyLen int
// prevRowsLen contains the number rows scraped during the previous scrape.
// prevLabelsLen contains the number labels scraped during the previous scrape.
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
prevRowsLen int
prevLabelsLen int
}
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
@ -283,7 +283,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
scrapeDuration.Update(duration)
scrapeResponseSize.Update(float64(len(body.B)))
up := 1
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
if err != nil {
up = 0
scrapesFailed.Inc()
@ -294,22 +294,10 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
srcRows := wc.rows.Rows
samplesScraped := len(srcRows)
scrapedSamples.Update(float64(samplesScraped))
samplesPostRelabeling := 0
for i := range srcRows {
sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true)
if len(wc.labels) > 40000 {
// Limit the maximum size of wc.writeRequest.
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
}
}
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
samplesPostRelabeling := len(wc.writeRequest.Timeseries)
if sw.Config.SampleLimit > 0 && samplesPostRelabeling > sw.Config.SampleLimit {
wc.resetNoRows()
up = 0
@ -325,7 +313,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevRowsLen = samplesScraped
sw.prevLabelsLen = len(wc.labels)
wc.reset()
writeRequestCtxPool.Put(wc)
// body must be released only after wc is released, since wc refers to body.
@ -339,7 +327,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
samplesScraped := 0
samplesPostRelabeling := 0
responseSize := int64(0)
wc := writeRequestCtxPool.Get(sw.prevRowsLen)
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
sr, err := sw.GetStreamReader()
if err != nil {
@ -389,7 +377,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
sw.prevRowsLen = len(wc.rows.Rows)
sw.prevLabelsLen = len(wc.labels)
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
@ -401,11 +389,11 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
//
// Its logic has been copied from leveledbytebufferpool.
type leveledWriteRequestCtxPool struct {
pools [30]sync.Pool
pools [13]sync.Pool
}
func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx {
id, capacityNeeded := lwp.getPoolIDAndCapacity(rowsCapacity)
func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx {
id, capacityNeeded := lwp.getPoolIDAndCapacity(labelsCapacity)
for i := 0; i < 2; i++ {
if id < 0 || id >= len(lwp.pools) {
break
@ -421,10 +409,12 @@ func (lwp *leveledWriteRequestCtxPool) Get(rowsCapacity int) *writeRequestCtx {
}
func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) {
capacity := cap(wc.rows.Rows)
id, _ := lwp.getPoolIDAndCapacity(capacity)
wc.reset()
lwp.pools[id].Put(wc)
capacity := cap(wc.labels)
id, poolCapacity := lwp.getPoolIDAndCapacity(capacity)
if capacity <= poolCapacity {
wc.reset()
lwp.pools[id].Put(wc)
}
}
func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int) {
@ -434,7 +424,7 @@ func (lwp *leveledWriteRequestCtxPool) getPoolIDAndCapacity(size int) (int, int)
}
size >>= 3
id := bits.Len(uint(size))
if id > len(lwp.pools) {
if id >= len(lwp.pools) {
id = len(lwp.pools) - 1
}
return id, (1 << (id + 3))