mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
lib/promscrape: reduce memory usage when scraping big number of targets
Thanks to @dxtrzhang for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/688 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/689
This commit is contained in:
parent
eead3ee8ec
commit
4628deecd1
3 changed files with 158 additions and 45 deletions
54
lib/leveledbytebufferpool/pool.go
Normal file
54
lib/leveledbytebufferpool/pool.go
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
package leveledbytebufferpool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/bits"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pools contains pools for byte slices of various capacities.
|
||||||
|
//
|
||||||
|
// pools[0] is for capacities from 0 to 7
|
||||||
|
// pools[1] is for capacities from 8 to 15
|
||||||
|
// pools[2] is for capacities from 16 to 31
|
||||||
|
// pools[3] is for capacities from 32 to 63
|
||||||
|
//
|
||||||
|
var pools [30]sync.Pool
|
||||||
|
|
||||||
|
// Get returns byte buffer with the given capacity.
|
||||||
|
func Get(capacity int) *bytesutil.ByteBuffer {
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
v := getPool(capacity).Get()
|
||||||
|
if v != nil {
|
||||||
|
return v.(*bytesutil.ByteBuffer)
|
||||||
|
}
|
||||||
|
if capacity > 1<<30 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
capacity *= 2
|
||||||
|
}
|
||||||
|
return &bytesutil.ByteBuffer{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put returns bb to the pool.
|
||||||
|
func Put(bb *bytesutil.ByteBuffer) {
|
||||||
|
capacity := cap(bb.B)
|
||||||
|
bb.Reset()
|
||||||
|
getPool(capacity).Put(bb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPool(size int) *sync.Pool {
|
||||||
|
if size < 0 {
|
||||||
|
size = 0
|
||||||
|
}
|
||||||
|
size >>= 3
|
||||||
|
n := bits.Len(uint(size))
|
||||||
|
if n > len(pools) {
|
||||||
|
n = len(pools) - 1
|
||||||
|
}
|
||||||
|
if n < 0 {
|
||||||
|
n = 0
|
||||||
|
}
|
||||||
|
return &pools[n]
|
||||||
|
}
|
36
lib/leveledbytebufferpool/pool_test.go
Normal file
36
lib/leveledbytebufferpool/pool_test.go
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
package leveledbytebufferpool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetPutConcurrent(t *testing.T) {
|
||||||
|
const concurrency = 10
|
||||||
|
doneCh := make(chan struct{}, concurrency)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
go func() {
|
||||||
|
for capacity := -1; capacity < 100; capacity++ {
|
||||||
|
bb := Get(capacity)
|
||||||
|
if len(bb.B) > 0 {
|
||||||
|
panic(fmt.Errorf("len(bb.B) must be zero; got %d", len(bb.B)))
|
||||||
|
}
|
||||||
|
if capacity < 0 {
|
||||||
|
capacity = 0
|
||||||
|
}
|
||||||
|
bb.B = append(bb.B, make([]byte, capacity)...)
|
||||||
|
Put(bb)
|
||||||
|
}
|
||||||
|
doneCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
tc := time.After(10 * time.Second)
|
||||||
|
for i := 0; i < concurrency; i++ {
|
||||||
|
select {
|
||||||
|
case <-tc:
|
||||||
|
t.Fatalf("timeout")
|
||||||
|
case <-doneCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,9 +5,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
@ -125,17 +127,15 @@ type scrapeWork struct {
|
||||||
// scrapeWork belongs to
|
// scrapeWork belongs to
|
||||||
ScrapeGroup string
|
ScrapeGroup string
|
||||||
|
|
||||||
bodyBuf []byte
|
tmpRow parser.Row
|
||||||
rows parser.Rows
|
|
||||||
tmpRow parser.Row
|
|
||||||
|
|
||||||
writeRequest prompbmarshal.WriteRequest
|
|
||||||
labels []prompbmarshal.Label
|
|
||||||
samples []prompbmarshal.Sample
|
|
||||||
|
|
||||||
// the prevSeriesMap and lh are used for fast calculation of `scrape_series_added` metric.
|
// the prevSeriesMap and lh are used for fast calculation of `scrape_series_added` metric.
|
||||||
prevSeriesMap map[uint64]struct{}
|
prevSeriesMap map[uint64]struct{}
|
||||||
labelsHashBuf []byte
|
labelsHashBuf []byte
|
||||||
|
|
||||||
|
// prevBodyCapacity contains the previous response body capacity for the given scrape work.
|
||||||
|
// It is used as a hint in order to reduce memory usage for body buffers.
|
||||||
|
prevBodyCapacity int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
||||||
|
@ -204,53 +204,76 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
|
func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error {
|
||||||
|
body := leveledbytebufferpool.Get(sw.prevBodyCapacity)
|
||||||
var err error
|
var err error
|
||||||
sw.bodyBuf, err = sw.ReadData(sw.bodyBuf[:0])
|
body.B, err = sw.ReadData(body.B[:0])
|
||||||
endTimestamp := time.Now().UnixNano() / 1e6
|
endTimestamp := time.Now().UnixNano() / 1e6
|
||||||
duration := float64(endTimestamp-realTimestamp) / 1e3
|
duration := float64(endTimestamp-realTimestamp) / 1e3
|
||||||
scrapeDuration.Update(duration)
|
scrapeDuration.Update(duration)
|
||||||
scrapeResponseSize.Update(float64(len(sw.bodyBuf)))
|
scrapeResponseSize.Update(float64(len(body.B)))
|
||||||
up := 1
|
up := 1
|
||||||
|
wc := writeRequestCtxPool.Get().(*writeRequestCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
up = 0
|
up = 0
|
||||||
scrapesFailed.Inc()
|
scrapesFailed.Inc()
|
||||||
} else {
|
} else {
|
||||||
bodyString := bytesutil.ToUnsafeString(sw.bodyBuf)
|
bodyString := bytesutil.ToUnsafeString(body.B)
|
||||||
sw.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
|
wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError)
|
||||||
}
|
}
|
||||||
srcRows := sw.rows.Rows
|
srcRows := wc.rows.Rows
|
||||||
samplesScraped := len(srcRows)
|
samplesScraped := len(srcRows)
|
||||||
scrapedSamples.Update(float64(samplesScraped))
|
scrapedSamples.Update(float64(samplesScraped))
|
||||||
for i := range srcRows {
|
for i := range srcRows {
|
||||||
sw.addRowToTimeseries(&srcRows[i], scrapeTimestamp, true)
|
sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true)
|
||||||
}
|
}
|
||||||
sw.rows.Reset()
|
if sw.Config.SampleLimit > 0 && len(wc.writeRequest.Timeseries) > sw.Config.SampleLimit {
|
||||||
if sw.Config.SampleLimit > 0 && len(sw.writeRequest.Timeseries) > sw.Config.SampleLimit {
|
prompbmarshal.ResetWriteRequest(&wc.writeRequest)
|
||||||
prompbmarshal.ResetWriteRequest(&sw.writeRequest)
|
|
||||||
up = 0
|
up = 0
|
||||||
scrapesSkippedBySampleLimit.Inc()
|
scrapesSkippedBySampleLimit.Inc()
|
||||||
}
|
}
|
||||||
samplesPostRelabeling := len(sw.writeRequest.Timeseries)
|
samplesPostRelabeling := len(wc.writeRequest.Timeseries)
|
||||||
seriesAdded := sw.getSeriesAdded()
|
seriesAdded := sw.getSeriesAdded(wc)
|
||||||
sw.addAutoTimeseries("up", float64(up), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries("scrape_duration_seconds", duration, scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
|
||||||
sw.addAutoTimeseries("scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries("scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries("scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
sw.PushData(&sw.writeRequest)
|
sw.PushData(&wc.writeRequest)
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
pushDataDuration.UpdateDuration(startTime)
|
||||||
prompbmarshal.ResetWriteRequest(&sw.writeRequest)
|
wc.reset()
|
||||||
sw.labels = sw.labels[:0]
|
writeRequestCtxPool.Put(wc)
|
||||||
sw.samples = sw.samples[:0]
|
// body must be released only after wc is released, since wc refers to body.
|
||||||
|
sw.prevBodyCapacity = cap(body.B)
|
||||||
|
leveledbytebufferpool.Put(body)
|
||||||
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
|
tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) getSeriesAdded() int {
|
type writeRequestCtx struct {
|
||||||
|
rows parser.Rows
|
||||||
|
writeRequest prompbmarshal.WriteRequest
|
||||||
|
labels []prompbmarshal.Label
|
||||||
|
samples []prompbmarshal.Sample
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wc *writeRequestCtx) reset() {
|
||||||
|
wc.rows.Reset()
|
||||||
|
prompbmarshal.ResetWriteRequest(&wc.writeRequest)
|
||||||
|
wc.labels = wc.labels[:0]
|
||||||
|
wc.samples = wc.samples[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
var writeRequestCtxPool = &sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return &writeRequestCtx{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int {
|
||||||
mPrev := sw.prevSeriesMap
|
mPrev := sw.prevSeriesMap
|
||||||
seriesAdded := 0
|
seriesAdded := 0
|
||||||
for _, ts := range sw.writeRequest.Timeseries {
|
for _, ts := range wc.writeRequest.Timeseries {
|
||||||
h := sw.getLabelsHash(ts.Labels)
|
h := sw.getLabelsHash(ts.Labels)
|
||||||
if _, ok := mPrev[h]; !ok {
|
if _, ok := mPrev[h]; !ok {
|
||||||
seriesAdded++
|
seriesAdded++
|
||||||
|
@ -262,8 +285,8 @@ func (sw *scrapeWork) getSeriesAdded() int {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: update the sw.prevSeriesMap, since new time series were added.
|
// Slow path: update the sw.prevSeriesMap, since new time series were added.
|
||||||
m := make(map[uint64]struct{}, len(sw.writeRequest.Timeseries))
|
m := make(map[uint64]struct{}, len(wc.writeRequest.Timeseries))
|
||||||
for _, ts := range sw.writeRequest.Timeseries {
|
for _, ts := range wc.writeRequest.Timeseries {
|
||||||
h := sw.getLabelsHash(ts.Labels)
|
h := sw.getLabelsHash(ts.Labels)
|
||||||
m[h] = struct{}{}
|
m[h] = struct{}{}
|
||||||
}
|
}
|
||||||
|
@ -286,40 +309,40 @@ func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 {
|
||||||
// addAutoTimeseries adds automatically generated time series with the given name, value and timestamp.
|
// addAutoTimeseries adds automatically generated time series with the given name, value and timestamp.
|
||||||
//
|
//
|
||||||
// See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
// See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series
|
||||||
func (sw *scrapeWork) addAutoTimeseries(name string, value float64, timestamp int64) {
|
func (sw *scrapeWork) addAutoTimeseries(wc *writeRequestCtx, name string, value float64, timestamp int64) {
|
||||||
sw.tmpRow.Metric = name
|
sw.tmpRow.Metric = name
|
||||||
sw.tmpRow.Tags = nil
|
sw.tmpRow.Tags = nil
|
||||||
sw.tmpRow.Value = value
|
sw.tmpRow.Value = value
|
||||||
sw.tmpRow.Timestamp = timestamp
|
sw.tmpRow.Timestamp = timestamp
|
||||||
sw.addRowToTimeseries(&sw.tmpRow, timestamp, false)
|
sw.addRowToTimeseries(wc, &sw.tmpRow, timestamp, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) addRowToTimeseries(r *parser.Row, timestamp int64, needRelabel bool) {
|
func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, timestamp int64, needRelabel bool) {
|
||||||
labelsLen := len(sw.labels)
|
labelsLen := len(wc.labels)
|
||||||
sw.labels = appendLabels(sw.labels, r.Metric, r.Tags, sw.Config.Labels, sw.Config.HonorLabels)
|
wc.labels = appendLabels(wc.labels, r.Metric, r.Tags, sw.Config.Labels, sw.Config.HonorLabels)
|
||||||
if needRelabel {
|
if needRelabel {
|
||||||
sw.labels = promrelabel.ApplyRelabelConfigs(sw.labels, labelsLen, sw.Config.MetricRelabelConfigs, true)
|
wc.labels = promrelabel.ApplyRelabelConfigs(wc.labels, labelsLen, sw.Config.MetricRelabelConfigs, true)
|
||||||
} else {
|
} else {
|
||||||
sw.labels = promrelabel.FinalizeLabels(sw.labels[:labelsLen], sw.labels[labelsLen:])
|
wc.labels = promrelabel.FinalizeLabels(wc.labels[:labelsLen], wc.labels[labelsLen:])
|
||||||
promrelabel.SortLabels(sw.labels[labelsLen:])
|
promrelabel.SortLabels(wc.labels[labelsLen:])
|
||||||
}
|
}
|
||||||
if len(sw.labels) == labelsLen {
|
if len(wc.labels) == labelsLen {
|
||||||
// Skip row without labels.
|
// Skip row without labels.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
labels := sw.labels[labelsLen:]
|
labels := wc.labels[labelsLen:]
|
||||||
sw.samples = append(sw.samples, prompbmarshal.Sample{})
|
wc.samples = append(wc.samples, prompbmarshal.Sample{})
|
||||||
sample := &sw.samples[len(sw.samples)-1]
|
sample := &wc.samples[len(wc.samples)-1]
|
||||||
sample.Value = r.Value
|
sample.Value = r.Value
|
||||||
sample.Timestamp = r.Timestamp
|
sample.Timestamp = r.Timestamp
|
||||||
if !sw.Config.HonorTimestamps || sample.Timestamp == 0 {
|
if !sw.Config.HonorTimestamps || sample.Timestamp == 0 {
|
||||||
sample.Timestamp = timestamp
|
sample.Timestamp = timestamp
|
||||||
}
|
}
|
||||||
wr := &sw.writeRequest
|
wr := &wc.writeRequest
|
||||||
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{})
|
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{})
|
||||||
ts := &wr.Timeseries[len(wr.Timeseries)-1]
|
ts := &wr.Timeseries[len(wr.Timeseries)-1]
|
||||||
ts.Labels = labels
|
ts.Labels = labels
|
||||||
ts.Samples = sw.samples[len(sw.samples)-1:]
|
ts.Samples = wc.samples[len(wc.samples)-1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendLabels(dst []prompbmarshal.Label, metric string, src []parser.Tag, extraLabels []prompbmarshal.Label, honorLabels bool) []prompbmarshal.Label {
|
func appendLabels(dst []prompbmarshal.Label, metric string, src []parser.Tag, extraLabels []prompbmarshal.Label, honorLabels bool) []prompbmarshal.Label {
|
||||||
|
|
Loading…
Reference in a new issue