mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/promscrape: reduce memory usage when scraping targets with big number of metrics alongside targets with small number of labels
Previously targets with big number of metrics and/or labels could generated too big buffers, which then could be re-used when scraping targets with small number of metrics. This resulted in memory waste. Now big buffers are used only for targets with big number of metrics / labels, while small buffers are used for targets with small number of metrics / labels.
This commit is contained in:
parent
d6967319b6
commit
efc730863b
2 changed files with 52 additions and 9 deletions
|
@ -19,9 +19,6 @@ var pools [30]sync.Pool
|
|||
|
||||
// Get returns byte buffer with the given capacity.
|
||||
func Get(capacity int) *bytesutil.ByteBuffer {
|
||||
if capacity <= 0 {
|
||||
capacity = 1
|
||||
}
|
||||
id, capacityNeeded := getPoolIdAndCapacity(capacity)
|
||||
for i := 0; i < 2; i++ {
|
||||
if id < 0 || id >= len(pools) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/bits"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -136,6 +137,10 @@ type scrapeWork struct {
|
|||
// prevBodyLen contains the previous response body length for the given scrape work.
|
||||
// It is used as a hint in order to reduce memory usage for body buffers.
|
||||
prevBodyLen int
|
||||
|
||||
// prevLabelsLen contains the number of all the labels generated during the previous scrape.
|
||||
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
|
||||
prevLabelsLen int
|
||||
}
|
||||
|
||||
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
||||
|
@ -212,7 +217,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
scrapeDuration.Update(duration)
|
||||
scrapeResponseSize.Update(float64(len(body.B)))
|
||||
up := 1
|
||||
wc := writeRequestCtxPool.Get().(*writeRequestCtx)
|
||||
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
|
||||
if err != nil {
|
||||
up = 0
|
||||
scrapesFailed.Inc()
|
||||
|
@ -241,6 +246,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
startTime := time.Now()
|
||||
sw.PushData(&wc.writeRequest)
|
||||
pushDataDuration.UpdateDuration(startTime)
|
||||
sw.prevLabelsLen = len(wc.labels)
|
||||
wc.reset()
|
||||
writeRequestCtxPool.Put(wc)
|
||||
// body must be released only after wc is released, since wc refers to body.
|
||||
|
@ -250,6 +256,50 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
|||
return err
|
||||
}
|
||||
|
||||
// leveldWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
|
||||
// structs contain mixed number of labels.
|
||||
//
|
||||
// Its logic has been copied from leveledbytebufferpool.
|
||||
type leveledWriteRequestCtxPool struct {
|
||||
pools [30]sync.Pool
|
||||
}
|
||||
|
||||
func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx {
|
||||
id, capacityNeeded := lwp.getPoolIdAndCapacity(labelsCapacity)
|
||||
for i := 0; i < 2; i++ {
|
||||
if id < 0 || id >= len(lwp.pools) {
|
||||
break
|
||||
}
|
||||
if v := lwp.pools[id].Get(); v != nil {
|
||||
return v.(*writeRequestCtx)
|
||||
}
|
||||
id++
|
||||
}
|
||||
return &writeRequestCtx{
|
||||
labels: make([]prompbmarshal.Label, 0, capacityNeeded),
|
||||
}
|
||||
}
|
||||
|
||||
func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) {
|
||||
capacity := cap(wc.labels)
|
||||
id, _ := lwp.getPoolIdAndCapacity(capacity)
|
||||
wc.reset()
|
||||
lwp.pools[id].Put(wc)
|
||||
}
|
||||
|
||||
func (lwp *leveledWriteRequestCtxPool) getPoolIdAndCapacity(size int) (int, int) {
|
||||
size--
|
||||
if size < 0 {
|
||||
size = 0
|
||||
}
|
||||
size >>= 3
|
||||
id := bits.Len(uint(size))
|
||||
if id > len(lwp.pools) {
|
||||
id = len(lwp.pools) - 1
|
||||
}
|
||||
return id, (1 << (id + 3))
|
||||
}
|
||||
|
||||
type writeRequestCtx struct {
|
||||
rows parser.Rows
|
||||
writeRequest prompbmarshal.WriteRequest
|
||||
|
@ -264,11 +314,7 @@ func (wc *writeRequestCtx) reset() {
|
|||
wc.samples = wc.samples[:0]
|
||||
}
|
||||
|
||||
var writeRequestCtxPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &writeRequestCtx{}
|
||||
},
|
||||
}
|
||||
var writeRequestCtxPool leveledWriteRequestCtxPool
|
||||
|
||||
func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int {
|
||||
mPrev := sw.prevSeriesMap
|
||||
|
|
Loading…
Reference in a new issue