mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/promscrape: reduce memory usage when scraping targets with big number of metrics alongside targets with small number of labels
Previously targets with big number of metrics and/or labels could generated too big buffers, which then could be re-used when scraping targets with small number of metrics. This resulted in memory waste. Now big buffers are used only for targets with big number of metrics / labels, while small buffers are used for targets with small number of metrics / labels.
This commit is contained in:
parent
3ea6444219
commit
9a77ae9d1c
2 changed files with 52 additions and 9 deletions
|
@ -19,9 +19,6 @@ var pools [30]sync.Pool
|
||||||
|
|
||||||
// Get returns byte buffer with the given capacity.
|
// Get returns byte buffer with the given capacity.
|
||||||
func Get(capacity int) *bytesutil.ByteBuffer {
|
func Get(capacity int) *bytesutil.ByteBuffer {
|
||||||
if capacity <= 0 {
|
|
||||||
capacity = 1
|
|
||||||
}
|
|
||||||
id, capacityNeeded := getPoolIdAndCapacity(capacity)
|
id, capacityNeeded := getPoolIdAndCapacity(capacity)
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
if id < 0 || id >= len(pools) {
|
if id < 0 || id >= len(pools) {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"math/bits"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -136,6 +137,10 @@ type scrapeWork struct {
|
||||||
// prevBodyLen contains the previous response body length for the given scrape work.
|
// prevBodyLen contains the previous response body length for the given scrape work.
|
||||||
// It is used as a hint in order to reduce memory usage for body buffers.
|
// It is used as a hint in order to reduce memory usage for body buffers.
|
||||||
prevBodyLen int
|
prevBodyLen int
|
||||||
|
|
||||||
|
// prevLabelsLen contains the number of all the labels generated during the previous scrape.
|
||||||
|
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
|
||||||
|
prevLabelsLen int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
||||||
|
@ -212,7 +217,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
scrapeResponseSize.Update(float64(len(body.B)))
|
scrapeResponseSize.Update(float64(len(body.B)))
|
||||||
up := 1
|
up := 1
|
||||||
wc := writeRequestCtxPool.Get().(*writeRequestCtx)
|
wc := writeRequestCtxPool.Get(sw.prevLabelsLen)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
up = 0
|
up = 0
|
||||||
scrapesFailed.Inc()
|
scrapesFailed.Inc()
|
||||||
|
@ -241,6 +246,7 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
sw.PushData(&wc.writeRequest)
|
sw.PushData(&wc.writeRequest)
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
pushDataDuration.UpdateDuration(startTime)
|
||||||
|
sw.prevLabelsLen = len(wc.labels)
|
||||||
wc.reset()
|
wc.reset()
|
||||||
writeRequestCtxPool.Put(wc)
|
writeRequestCtxPool.Put(wc)
|
||||||
// body must be released only after wc is released, since wc refers to body.
|
// body must be released only after wc is released, since wc refers to body.
|
||||||
|
@ -250,6 +256,50 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// leveldWriteRequestCtxPool allows reducing memory usage when writeRequesCtx
|
||||||
|
// structs contain mixed number of labels.
|
||||||
|
//
|
||||||
|
// Its logic has been copied from leveledbytebufferpool.
|
||||||
|
type leveledWriteRequestCtxPool struct {
|
||||||
|
pools [30]sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lwp *leveledWriteRequestCtxPool) Get(labelsCapacity int) *writeRequestCtx {
|
||||||
|
id, capacityNeeded := lwp.getPoolIdAndCapacity(labelsCapacity)
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
if id < 0 || id >= len(lwp.pools) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if v := lwp.pools[id].Get(); v != nil {
|
||||||
|
return v.(*writeRequestCtx)
|
||||||
|
}
|
||||||
|
id++
|
||||||
|
}
|
||||||
|
return &writeRequestCtx{
|
||||||
|
labels: make([]prompbmarshal.Label, 0, capacityNeeded),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lwp *leveledWriteRequestCtxPool) Put(wc *writeRequestCtx) {
|
||||||
|
capacity := cap(wc.labels)
|
||||||
|
id, _ := lwp.getPoolIdAndCapacity(capacity)
|
||||||
|
wc.reset()
|
||||||
|
lwp.pools[id].Put(wc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lwp *leveledWriteRequestCtxPool) getPoolIdAndCapacity(size int) (int, int) {
|
||||||
|
size--
|
||||||
|
if size < 0 {
|
||||||
|
size = 0
|
||||||
|
}
|
||||||
|
size >>= 3
|
||||||
|
id := bits.Len(uint(size))
|
||||||
|
if id > len(lwp.pools) {
|
||||||
|
id = len(lwp.pools) - 1
|
||||||
|
}
|
||||||
|
return id, (1 << (id + 3))
|
||||||
|
}
|
||||||
|
|
||||||
type writeRequestCtx struct {
|
type writeRequestCtx struct {
|
||||||
rows parser.Rows
|
rows parser.Rows
|
||||||
writeRequest prompbmarshal.WriteRequest
|
writeRequest prompbmarshal.WriteRequest
|
||||||
|
@ -264,11 +314,7 @@ func (wc *writeRequestCtx) reset() {
|
||||||
wc.samples = wc.samples[:0]
|
wc.samples = wc.samples[:0]
|
||||||
}
|
}
|
||||||
|
|
||||||
var writeRequestCtxPool = &sync.Pool{
|
var writeRequestCtxPool leveledWriteRequestCtxPool
|
||||||
New: func() interface{} {
|
|
||||||
return &writeRequestCtx{}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int {
|
func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int {
|
||||||
mPrev := sw.prevSeriesMap
|
mPrev := sw.prevSeriesMap
|
||||||
|
|
Loading…
Reference in a new issue