lib/promscrape: do not reset the remaining rows when pushing a part of data to remote storage during big scrapes

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/753

Thanks to @PerGon and @clmssz for help with debugging.
This commit is contained in:
Aliaksandr Valialkin 2020-09-11 23:36:24 +03:00
parent 0e533d1a9c
commit 7d893a234c
2 changed files with 50 additions and 36 deletions

View file

@ -130,8 +130,9 @@ type scrapeWork struct {
tmpRow parser.Row
// the prevSeriesMap and lh are used for fast calculation of `scrape_series_added` metric.
prevSeriesMap map[uint64]struct{}
// the seriesMap, seriesAdded and labelsHashBuf are used for fast calculation of `scrape_series_added` metric.
seriesMap map[uint64]struct{}
seriesAdded int
labelsHashBuf []byte
// prevBodyLen contains the previous response body length for the given scrape work.
@ -241,14 +242,16 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
// This should reduce memory usage when scraping targets with millions of metrics and/or labels.
// For example, when scraping /federate handler from Prometheus - see https://prometheus.io/docs/prometheus/latest/federation/
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
sw.updateSeriesAdded(wc)
startTime := time.Now()
sw.PushData(&wc.writeRequest)
pushDataDuration.UpdateDuration(startTime)
wc.reset()
wc.resetNoRows()
}
}
samplesPostRelabeling += len(wc.writeRequest.Timeseries)
seriesAdded := sw.getSeriesAdded(wc)
sw.updateSeriesAdded(wc)
seriesAdded := sw.finalizeSeriesAdded(samplesPostRelabeling)
sw.addAutoTimeseries(wc, "up", float64(up), scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_duration_seconds", duration, scrapeTimestamp)
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
@ -320,6 +323,10 @@ type writeRequestCtx struct {
func (wc *writeRequestCtx) reset() {
wc.rows.Reset()
wc.resetNoRows()
}
func (wc *writeRequestCtx) resetNoRows() {
prompbmarshal.ResetWriteRequest(&wc.writeRequest)
wc.labels = wc.labels[:0]
wc.samples = wc.samples[:0]
@ -327,27 +334,27 @@ func (wc *writeRequestCtx) reset() {
var writeRequestCtxPool leveledWriteRequestCtxPool
func (sw *scrapeWork) getSeriesAdded(wc *writeRequestCtx) int {
mPrev := sw.prevSeriesMap
seriesAdded := 0
func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) {
if sw.seriesMap == nil {
sw.seriesMap = make(map[uint64]struct{}, len(wc.writeRequest.Timeseries))
}
m := sw.seriesMap
for _, ts := range wc.writeRequest.Timeseries {
h := sw.getLabelsHash(ts.Labels)
if _, ok := mPrev[h]; !ok {
seriesAdded++
if _, ok := m[h]; !ok {
m[h] = struct{}{}
sw.seriesAdded++
}
}
if seriesAdded == 0 {
// Fast path: no new time series added during the last scrape.
return 0
}
}
// Slow path: update the sw.prevSeriesMap, since new time series were added.
m := make(map[uint64]struct{}, len(wc.writeRequest.Timeseries))
for _, ts := range wc.writeRequest.Timeseries {
h := sw.getLabelsHash(ts.Labels)
m[h] = struct{}{}
func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {
seriesAdded := sw.seriesAdded
sw.seriesAdded = 0
if len(sw.seriesMap) > 2*lastScrapeSize {
// Reset seriesMap, since it occupies more than 4x metrics collected during the last scrape.
sw.seriesMap = make(map[uint64]struct{}, lastScrapeSize)
}
sw.prevSeriesMap = m
return seriesAdded
}
@ -387,19 +394,19 @@ func (sw *scrapeWork) addRowToTimeseries(wc *writeRequestCtx, r *parser.Row, tim
// Skip row without labels.
return
}
labels := wc.labels[labelsLen:]
wc.samples = append(wc.samples, prompbmarshal.Sample{})
sample := &wc.samples[len(wc.samples)-1]
sample.Value = r.Value
sample.Timestamp = r.Timestamp
if !sw.Config.HonorTimestamps || sample.Timestamp == 0 {
sample.Timestamp = timestamp
sampleTimestamp := r.Timestamp
if !sw.Config.HonorTimestamps || sampleTimestamp == 0 {
sampleTimestamp = timestamp
}
wc.samples = append(wc.samples, prompbmarshal.Sample{
Value: r.Value,
Timestamp: sampleTimestamp,
})
wr := &wc.writeRequest
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{})
ts := &wr.Timeseries[len(wr.Timeseries)-1]
ts.Labels = labels
ts.Samples = wc.samples[len(wc.samples)-1:]
wr.Timeseries = append(wr.Timeseries, prompbmarshal.TimeSeries{
Labels: wc.labels[labelsLen:],
Samples: wc.samples[len(wc.samples)-1:],
})
}
func appendLabels(dst []prompbmarshal.Label, metric string, src []parser.Tag, extraLabels []prompbmarshal.Label, honorLabels bool) []prompbmarshal.Label {

View file

@ -72,10 +72,17 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
pushDataCalls := 0
var pushDataErr error
sw.PushData = func(wr *prompbmarshal.WriteRequest) {
if err := expectEqualTimeseries(wr.Timeseries, timeseriesExpected); err != nil {
pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%#v\nwant\n%#v", err, wr.Timeseries, timeseriesExpected)
}
pushDataCalls++
if len(wr.Timeseries) > len(timeseriesExpected) {
pushDataErr = fmt.Errorf("too many time series obtained; got %d; want %d", len(wr.Timeseries), len(timeseriesExpected))
return
}
tsExpected := timeseriesExpected[:len(wr.Timeseries)]
timeseriesExpected = timeseriesExpected[len(tsExpected):]
if err := expectEqualTimeseries(wr.Timeseries, tsExpected); err != nil {
pushDataErr = fmt.Errorf("unexpected data pushed: %w\ngot\n%v\nwant\n%v", err, wr.Timeseries, tsExpected)
return
}
}
timestamp := int64(123)
@ -88,8 +95,8 @@ func TestScrapeWorkScrapeInternalSuccess(t *testing.T) {
if readDataCalls != 1 {
t.Fatalf("unexpected number of readData calls; got %d; want %d", readDataCalls, 1)
}
if pushDataCalls != 1 {
t.Fatalf("unexpected number of pushData calls; got %d; want %d", pushDataCalls, 1)
if pushDataCalls == 0 {
t.Fatalf("missing pushData calls")
}
}
@ -359,7 +366,7 @@ func expectEqualTimeseries(tss, tssExpected []prompbmarshal.TimeSeries) error {
for k, tsExpected := range mExpected {
ts := m[k]
if ts != tsExpected {
return fmt.Errorf("unexpected timeseries %q; got\n%s\nwant\n%s", k, ts, tsExpected)
return fmt.Errorf("unexpected timeseries %q;\ngot\n%s\nwant\n%s", k, ts, tsExpected)
}
}
return nil