From 4628deecd13e9c95b9a1701188079ea3bde18271 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 13 Aug 2020 23:12:22 +0300 Subject: [PATCH] lib/promscrape: reduce memory usage when scraping big number of targets Thanks to @dxtrzhang for the original idea at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/688 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/689 --- lib/leveledbytebufferpool/pool.go | 54 ++++++++++++ lib/leveledbytebufferpool/pool_test.go | 36 ++++++++ lib/promscrape/scrapework.go | 113 +++++++++++++++---------- 3 files changed, 158 insertions(+), 45 deletions(-) create mode 100644 lib/leveledbytebufferpool/pool.go create mode 100644 lib/leveledbytebufferpool/pool_test.go diff --git a/lib/leveledbytebufferpool/pool.go b/lib/leveledbytebufferpool/pool.go new file mode 100644 index 0000000000..5ffc98085c --- /dev/null +++ b/lib/leveledbytebufferpool/pool.go @@ -0,0 +1,54 @@ +package leveledbytebufferpool + +import ( + "math/bits" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pools contains pools for byte slices of various capacities. +// +// pools[0] is for capacities from 0 to 7 +// pools[1] is for capacities from 8 to 15 +// pools[2] is for capacities from 16 to 31 +// pools[3] is for capacities from 32 to 63 +// +var pools [30]sync.Pool + +// Get returns byte buffer with the given capacity. +func Get(capacity int) *bytesutil.ByteBuffer { + for i := 0; i < 2; i++ { + v := getPool(capacity).Get() + if v != nil { + return v.(*bytesutil.ByteBuffer) + } + if capacity > 1<<30 { + break + } + capacity *= 2 + } + return &bytesutil.ByteBuffer{} +} + +// Put returns bb to the pool. +func Put(bb *bytesutil.ByteBuffer) { + capacity := cap(bb.B) + bb.Reset() + getPool(capacity).Put(bb) +} + +func getPool(size int) *sync.Pool { + if size < 0 { + size = 0 + } + size >>= 3 + n := bits.Len(uint(size)) + if n > len(pools) { + n = len(pools) - 1 + } + if n < 0 { + n = 0 + } + return &pools[n] +} diff --git a/lib/leveledbytebufferpool/pool_test.go b/lib/leveledbytebufferpool/pool_test.go new file mode 100644 index 0000000000..851513c68f --- /dev/null +++ b/lib/leveledbytebufferpool/pool_test.go @@ -0,0 +1,36 @@ +package leveledbytebufferpool + +import ( + "fmt" + "testing" + "time" +) + +func TestGetPutConcurrent(t *testing.T) { + const concurrency = 10 + doneCh := make(chan struct{}, concurrency) + for i := 0; i < concurrency; i++ { + go func() { + for capacity := -1; capacity < 100; capacity++ { + bb := Get(capacity) + if len(bb.B) > 0 { + panic(fmt.Errorf("len(bb.B) must be zero; got %d", len(bb.B))) + } + if capacity < 0 { + capacity = 0 + } + bb.B = append(bb.B, make([]byte, capacity)...) + Put(bb) + } + doneCh <- struct{}{} + }() + } + tc := time.After(10 * time.Second) + for i := 0; i < concurrency; i++ { + select { + case <-tc: + t.Fatalf("timeout") + case <-doneCh: + } + } +} diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 7b339e200e..e991b1fdc6 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,9 +5,11 @@ import ( "fmt" "math" "strings" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" @@ -125,17 +127,15 @@ type scrapeWork struct { // scrapeWork belongs to ScrapeGroup string - bodyBuf []byte - rows parser.Rows - tmpRow parser.Row - - writeRequest prompbmarshal.WriteRequest - labels []prompbmarshal.Label - samples []prompbmarshal.Sample + tmpRow parser.Row // the prevSeriesMap and lh are used for fast calculation of `scrape_series_added` metric. prevSeriesMap map[uint64]struct{} labelsHashBuf []byte + + // prevBodyCapacity contains the previous response body capacity for the given scrape work. + // It is used as a hint in order to reduce memory usage for body buffers. + prevBodyCapacity int } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -204,53 +204,76 @@ var ( ) func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error { + body := leveledbytebufferpool.Get(sw.prevBodyCapacity) var err error - sw.bodyBuf, err = sw.ReadData(sw.bodyBuf[:0]) + body.B, err = sw.ReadData(body.B[:0]) endTimestamp := time.Now().UnixNano() / 1e6 duration := float64(endTimestamp-realTimestamp) / 1e3 scrapeDuration.Update(duration) - scrapeResponseSize.Update(float64(len(sw.bodyBuf))) + scrapeResponseSize.Update(float64(len(body.B))) up := 1 + wc := writeRequestCtxPool.Get().(*writeRequestCtx) if err != nil { up = 0 scrapesFailed.Inc() } else { - bodyString := bytesutil.ToUnsafeString(sw.bodyBuf) - sw.rows.UnmarshalWithErrLogger(bodyString, sw.logError) + bodyString := bytesutil.ToUnsafeString(body.B) + wc.rows.UnmarshalWithErrLogger(bodyString, sw.logError) } - srcRows := sw.rows.Rows + srcRows := wc.rows.Rows samplesScraped := len(srcRows) scrapedSamples.Update(float64(samplesScraped)) for i := range srcRows { - sw.addRowToTimeseries(&srcRows[i], scrapeTimestamp, true) + sw.addRowToTimeseries(wc, &srcRows[i], scrapeTimestamp, true) } - sw.rows.Reset() - if sw.Config.SampleLimit > 0 && len(sw.writeRequest.Timeseries) > sw.Config.SampleLimit { - prompbmarshal.ResetWriteRequest(&sw.writeRequest) + if sw.Config.SampleLimit > 0 && len(wc.writeRequest.Timeseries) > sw.Config.SampleLimit { + prompbmarshal.ResetWriteRequest(&wc.writeRequest) up = 0 scrapesSkippedBySampleLimit.Inc() } - samplesPostRelabeling := len(sw.writeRequest.Timeseries) - seriesAdded := sw.getSeriesAdded() - sw.addAutoTimeseries("up", float64(up), scrapeTimestamp) - sw.addAutoTimeseries("scrape_duration_seconds", duration, scrapeTimestamp) - sw.addAutoTimeseries("scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) - sw.addAutoTimeseries("scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) - sw.addAutoTimeseries("scrape_series_added", float64(seriesAdded), scrapeTimestamp) + samplesPostRelabeling := len(wc.writeRequest.Timeseries) + seriesAdded := sw.getSeriesAdded(wc) + sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) + sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) startTime := time.Now() - sw.PushData(&sw.writeRequest) + sw.PushData(&wc.writeRequest) pushDataDuration.UpdateDuration(startTime) - prompbmarshal.ResetWriteRequest(&sw.writeRequest) - sw.labels = sw.labels[:0] - sw.samples = sw.samples[:0] + wc.reset() + writeRequestCtxPool.Put(wc) + // body must be released only after wc is released, since wc refers to body. + sw.prevBodyCapacity = cap(body.B) + leveledbytebufferpool.Put(body) tsmGlobal.Update(&sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err) return err } -func (sw *scrapeWork) getSeriesAdded() int { +type writeRequestCtx struct { + rows parser.Rows + writeRequest prompbmarshal.WriteRequest + labels []prompbmarshal.Label + samples []prompbmarshal.Sample +} + +func (wc *writeRequestCtx) reset() { + wc.rows.Reset() + prompbmarshal.ResetWriteRequest(&wc.writeRequest) + wc.labels = wc.labels[:0] + wc.samples = wc.samples[:0] +} + +var writeRequestCtxPool = &sync.Pool{ + New: func() interface{} { + return &writeRequestCtx{} + }, +} + +func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int { mPrev := sw.prevSeriesMap seriesAdded := 0 - for _, ts := range sw.writeRequest.Timeseries { + for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) if _, ok := mPrev[h]; !ok { seriesAdded++ @@ -262,8 +285,8 @@ func (sw *scrapeWork) getSeriesAdded() int { } // Slow path: update the sw.prevSeriesMap, since new time series were added. - m := make(map[uint64]struct{}, len(sw.writeRequest.Timeseries)) - for _, ts := range sw.writeRequest.Timeseries { + m := make(map[uint64]struct{}, len(wc.writeRequest.Timeseries)) + for _, ts := range wc.writeRequest.Timeseries { h := sw.getLabelsHash(ts.Labels) m[h] = struct{}{} } @@ -286,40 +309,40 @@ func (sw *scrapeWork) getLabelsHash(labels []prompbmarshal.Label) uint64 { // addAutoTimeseries adds automatically generated time series with the given name, value and timestamp. // // See https://prometheus.io/docs/concepts/jobs_instances/#automatically-generated-labels-and-time-series -func (sw *scrapeWork) addAutoTimeseries(name string, value float64, timestamp int64) { +func (sw *scrapeWork) addAutoTimeseries(wc *writeRequestCtx, name string, value float64, timestamp int64) { sw.tmpRow.Metric = name sw.tmpRow.Tags = nil sw.tmpRow.Value = value sw.tmpRow.Timestamp = timestamp - sw.addRowToTimeseries(&sw.tmpRow, timestamp, false) + sw.addRowToTimeseries(wc, &sw.tmpRow, timestamp, false) } -func (sw *scrapeWork) addRowToTimeseries(r *parser.Row, timestamp int64, needRelabel bool) { - labelsLen := len(sw.labels) - sw.labels = appendLabels(sw.labels, r.Metric, r.Tags, sw.Config.Labels, sw.Config.HonorLabels) +func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, timestamp int64, needRelabel bool) { + labelsLen := len(wc.labels) + wc.labels = appendLabels(wc.labels, r.Metric, r.Tags, sw.Config.Labels, sw.Config.HonorLabels) if needRelabel { - sw.labels = promrelabel.ApplyRelabelConfigs(sw.labels, labelsLen, sw.Config.MetricRelabelConfigs, true) + wc.labels = promrelabel.ApplyRelabelConfigs(wc.labels, labelsLen, sw.Config.MetricRelabelConfigs, true) } else { - sw.labels = promrelabel.FinalizeLabels(sw.labels[:labelsLen], sw.labels[labelsLen:]) - promrelabel.SortLabels(sw.labels[labelsLen:]) + wc.labels = promrelabel.FinalizeLabels(wc.labels[:labelsLen], wc.labels[labelsLen:]) + promrelabel.SortLabels(wc.labels[labelsLen:]) } - if len(sw.labels) == labelsLen { + if len(wc.labels) == labelsLen { // Skip row without labels. return } - labels := sw.labels[labelsLen:] - sw.samples = append(sw.samples, prompbmarshal.Sample{}) - sample := &sw.samples[len(sw.samples)-1] + labels := wc.labels[labelsLen:] + wc.samples = append(wc.samples, prompbmarshal.Sample{}) + sample := &wc.samples[len(wc.samples)-1] sample.Value = r.Value sample.Timestamp = r.Timestamp if !sw.Config.HonorTimestamps || sample.Timestamp == 0 { sample.Timestamp = timestamp } - wr := &sw.writeRequest + wr := &wc.writeRequest wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{}) ts := &wr.Timeseries[len(wr.Timeseries)-1] ts.Labels = labels - ts.Samples = sw.samples[len(sw.samples)-1:] + ts.Samples = wc.samples[len(wc.samples)-1:] } func appendLabels(dst []prompbmarshal.Label, metric string, src []parser.Tag, extraLabels []prompbmarshal.Label, honorLabels bool) []prompbmarshal.Label {