From c1f81f08d462862bea4e383c123f7ea76b8831a2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 13 Aug 2021 12:10:00 +0300 Subject: [PATCH] all: add support for Prometheus staleness markers Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1530 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845 --- app/vmselect/promql/rollup.go | 88 ++++++++++++------- docs/CHANGELOG.md | 2 + lib/decimal/decimal.go | 112 ++++++++++++++++++------- lib/decimal/decimal_test.go | 50 ++++++++++- lib/persistentqueue/fastqueue.go | 12 ++- lib/persistentqueue/persistentqueue.go | 5 -- lib/promscrape/scrapework.go | 80 ++++++++++++++++-- lib/storage/storage.go | 9 +- 8 files changed, 270 insertions(+), 88 deletions(-) diff --git a/app/vmselect/promql/rollup.go b/app/vmselect/promql/rollup.go index 97c353fcb..6c999a93c 100644 --- a/app/vmselect/promql/rollup.go +++ b/app/vmselect/promql/rollup.go @@ -271,16 +271,16 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en } newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig { return &rollupConfig{ - TagValue: tagValue, - Func: rf, - Start: start, - End: end, - Step: step, - Window: window, - MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name], - CanDropLastSample: name == "default_rollup", - LookbackDelta: lookbackDelta, - Timestamps: sharedTimestamps, + TagValue: tagValue, + Func: rf, + Start: start, + End: end, + Step: step, + Window: window, + MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name], + CanDropStalePoints: name == "default_rollup", + LookbackDelta: lookbackDelta, + Timestamps: sharedTimestamps, } } appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig { @@ -402,10 +402,10 @@ type rollupConfig struct { // when using window smaller than 2 x scrape_interval. MayAdjustWindow bool - // Whether the last sample can be dropped during rollup calculations. - // The last sample can be dropped for `default_rollup()` function only. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 . - CanDropLastSample bool + // Whether points after Prometheus stale marks can be dropped during rollup calculations. + // Stale points can be dropped only if `default_rollup()` function is used. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 . + CanDropStalePoints bool Timestamps []int64 @@ -506,6 +506,10 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu // Extend dstValues in order to remove mallocs below. dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps)) + if !rc.CanDropStalePoints { + // Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values. + values, timestamps = dropStaleNaNs(values, timestamps) + } scrapeInterval := getScrapeInterval(timestamps) maxPrevInterval := getMaxPrevInterval(scrapeInterval) if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta { @@ -519,7 +523,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu window := rc.Window if window <= 0 { window = rc.Step - if rc.CanDropLastSample && rc.LookbackDelta > 0 && window > rc.LookbackDelta { + if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta { // Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval // according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784 window = rc.LookbackDelta @@ -537,10 +541,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu j := 0 ni := 0 nj := 0 - stalenessInterval := int64(float64(scrapeInterval) * 0.9) - // Do not drop trailing data points for queries, which return 2 or 1 point (aka instant queries). - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845 - canDropLastSample := rc.CanDropLastSample && len(rc.Timestamps) > 2 f := rc.Func for _, tEnd := range rc.Timestamps { tStart := tEnd - window @@ -560,16 +560,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu } rfa.values = values[i:j] rfa.timestamps = timestamps[i:j] - if canDropLastSample && j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) { - // Drop trailing data points in the following cases: - // - if the distance between the last raw sample and tEnd exceeds stalenessInterval - // - if time series contains only a single raw sample - // This should prevent from double counting when a label changes in time series (for instance, - // during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 - rfa.prevValue = nan - rfa.values = nil - rfa.timestamps = nil - } if i > 0 { rfa.realPrevValue = values[i-1] } else { @@ -590,6 +580,31 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu return dstValues } +func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) { + hasStaleSamples := false + for _, v := range values { + if decimal.IsStaleNaN(v) { + hasStaleSamples = true + break + } + } + if !hasStaleSamples { + // Fast path: values have noe Prometheus staleness marks. + return values, timestamps + } + // Slow path: drop Prometheus staleness marks from values. + dstValues := make([]float64, 0, len(values)) + dstTimestamps := make([]int64, 0, len(timestamps)) + for i, v := range values { + if decimal.IsStaleNaN(v) { + continue + } + dstValues = append(dstValues, v) + dstTimestamps = append(dstTimestamps, timestamps[i]) + } + return dstValues, dstTimestamps +} + func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int { if len(timestamps) == 0 || timestamps[0] > seekTimestamp { return 0 @@ -1826,11 +1841,20 @@ func rollupFirst(rfa *rollupFuncArg) float64 { return values[0] } -var rollupDefault = rollupLast +func rollupDefault(rfa *rollupFuncArg) float64 { + values := rfa.values + if len(values) == 0 { + // Do not take into account rfa.prevValue, since it may lead + // to inconsistent results comparing to Prometheus on broken time series + // with irregular data points. + return nan + } + // Intentionally do not skip the possible last Prometheus staleness mark. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 . + return values[len(values)-1] +} func rollupLast(rfa *rollupFuncArg) float64 { - // There is no need in handling NaNs here, since they must be cleaned up - // before calling rollup funcs. values := rfa.values if len(values) == 0 { // Do not take into account rfa.prevValue, since it may lead diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f6b9af2de..6284398cc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,8 @@ sort: 15 ## tip +* FEATURE: add support for Prometheus staleness markers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526). +* FEATURE: vmagent: automatically generate Prometheus staleness markers for the scraped metrics when scrape targets disappear in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526). * FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0). * FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491). * FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0). diff --git a/lib/decimal/decimal.go b/lib/decimal/decimal.go index 9b75814cd..34388cd48 100644 --- a/lib/decimal/decimal.go +++ b/lib/decimal/decimal.go @@ -36,7 +36,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) { } upExp -= downExp for i, v := range a { - if v == vInfPos || v == vInfNeg { + if isSpecialValue(v) { + // Do not take into account special values. continue } adjExp := upExp @@ -48,7 +49,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) { } if downExp > 0 { for i, v := range b { - if v == vInfPos || v == vInfNeg { + if isSpecialValue(v) { + // Do not take into account special values. continue } adjExp := downExp @@ -106,13 +108,17 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 { } _ = a[len(va)-1] for i, v := range va { - f := float64(v) - if v == vInfPos { - f = infPos - } else if v == vInfNeg { - f = infNeg + a[i] = float64(v) + if !isSpecialValue(v) { + continue + } + if v == vInfPos { + a[i] = infPos + } else if v == vInfNeg { + a[i] = infNeg + } else { + a[i] = StaleNaN } - a[i] = f } return dst[:len(dst)+len(va)] } @@ -122,26 +128,34 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 { e10 := math.Pow10(int(-e)) _ = a[len(va)-1] for i, v := range va { - f := float64(v) / e10 - if v == vInfPos { - f = infPos - } else if v == vInfNeg { - f = infNeg + a[i] = float64(v) / e10 + if !isSpecialValue(v) { + continue + } + if v == vInfPos { + a[i] = infPos + } else if v == vInfNeg { + a[i] = infNeg + } else { + a[i] = StaleNaN } - a[i] = f } return dst[:len(dst)+len(va)] } e10 := math.Pow10(int(e)) _ = a[len(va)-1] for i, v := range va { - f := float64(v) * e10 - if v == vInfPos { - f = infPos - } else if v == vInfNeg { - f = infNeg + a[i] = float64(v) * e10 + if !isSpecialValue(v) { + continue + } + if v == vInfPos { + a[i] = infPos + } else if v == vInfNeg { + a[i] = infNeg + } else { + a[i] = StaleNaN } - a[i] = f } return dst[:len(dst)+len(va)] } @@ -184,7 +198,7 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) { v, exp := FromFloat(f) va[i] = v ea[i] = exp - if exp < minExp && v != vInfPos && v != vInfNeg { + if exp < minExp && !isSpecialValue(v) { minExp = exp } } @@ -211,7 +225,8 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) { _ = a[len(va)-1] _ = ea[len(va)-1] for i, v := range va { - if v == vInfPos || v == vInfNeg { + if isSpecialValue(v) { + // There is no need in scaling special values. a[i] = v continue } @@ -245,8 +260,8 @@ var vaeBufPool sync.Pool const int64Max = int64(1<<63 - 1) func maxUpExponent(v int64) int16 { - if v == 0 || v == vInfPos || v == vInfNeg { - // Any exponent allowed. + if v == 0 || isSpecialValue(v) { + // Any exponent allowed for zeroes and special values. return 1024 } if v < 0 { @@ -302,6 +317,10 @@ func maxUpExponent(v int64) int16 { // // See also RoundToSignificantFigures. func RoundToDecimalDigits(f float64, digits int) float64 { + if IsStaleNaN(f) { + // Do not modify stale nan mark value. + return f + } if digits <= -100 || digits >= 100 { return f } @@ -313,6 +332,10 @@ func RoundToDecimalDigits(f float64, digits int) float64 { // // See also RoundToDecimalDigits. func RoundToSignificantFigures(f float64, digits int) float64 { + if IsStaleNaN(f) { + // Do not modify stale nan mark value. + return f + } if digits <= 0 || digits >= 18 { return f } @@ -345,11 +368,14 @@ func RoundToSignificantFigures(f float64, digits int) float64 { // ToFloat returns f=v*10^e. func ToFloat(v int64, e int16) float64 { - if v == vInfPos { - return infPos - } - if v == vInfNeg { - return infNeg + if isSpecialValue(v) { + if v == vInfPos { + return infPos + } + if v == vInfNeg { + return infNeg + } + return StaleNaN } f := float64(v) // increase conversion precision for negative exponents by dividing by e10 @@ -364,24 +390,46 @@ var ( infNeg = math.Inf(-1) ) +// StaleNaN is a special NaN value, which is used as Prometheus staleness mark. +// See https://www.robustperception.io/staleness-and-promql +var StaleNaN = math.Float64frombits(staleNaNBits) + const ( - vInfPos = 1<<63 - 1 - vInfNeg = -1 << 63 + vInfPos = 1<<63 - 1 + vInfNeg = -1 << 63 + vStaleNaN = 1<<63 - 2 vMax = 1<<63 - 3 vMin = -1<<63 + 1 + + // staleNaNbits is bit representation of Prometheus staleness mark (aka stale NaN). + // This mark is put by Prometheus at the end of time series for improving staleness detection. + // See https://www.robustperception.io/staleness-and-promql + staleNaNBits uint64 = 0x7ff0000000000002 ) +func isSpecialValue(v int64) bool { + return v > vMax || v < vMin +} + +// IsStaleNaN returns true if f represents Prometheus staleness mark. +func IsStaleNaN(f float64) bool { + return math.Float64bits(f) == staleNaNBits +} + // FromFloat converts f to v*10^e. // // It tries minimizing v. // For instance, for f = -1.234 it returns v = -1234, e = -3. // -// FromFloat doesn't work properly with NaN values, so don't pass them here. +// FromFloat doesn't work properly with NaN values other than Prometheus staleness mark, so don't pass them here. func FromFloat(f float64) (int64, int16) { if f == 0 { return 0, 0 } + if IsStaleNaN(f) { + return vStaleNaN, 0 + } if math.IsInf(f, 0) { return fromFloatInf(f) } diff --git a/lib/decimal/decimal_test.go b/lib/decimal/decimal_test.go index 5bc177d64..bddd7271a 100644 --- a/lib/decimal/decimal_test.go +++ b/lib/decimal/decimal_test.go @@ -12,9 +12,16 @@ func TestRoundToDecimalDigits(t *testing.T) { t.Helper() result := RoundToDecimalDigits(f, digits) if math.IsNaN(result) { + if IsStaleNaN(resultExpected) { + if !IsStaleNaN(result) { + t.Fatalf("unexpected stale mark value; got %016X; want %016X", math.Float64bits(result), staleNaNBits) + } + return + } if !math.IsNaN(resultExpected) { t.Fatalf("unexpected result; got %v; want %v", result, resultExpected) } + return } if result != resultExpected { t.Fatalf("unexpected result; got %v; want %v", result, resultExpected) @@ -29,16 +36,27 @@ func TestRoundToDecimalDigits(t *testing.T) { f(1234, 0, 1234) f(1234.6, 0, 1235) f(123.4e-99, 99, 123e-99) + f(nan, 10, nan) + f(StaleNaN, 10, StaleNaN) } +var nan = math.NaN() + func TestRoundToSignificantFigures(t *testing.T) { f := func(f float64, digits int, resultExpected float64) { t.Helper() result := RoundToSignificantFigures(f, digits) if math.IsNaN(result) { + if IsStaleNaN(resultExpected) { + if !IsStaleNaN(result) { + t.Fatalf("unexpected stale mark value; got %016X; want %016X", math.Float64bits(result), staleNaNBits) + } + return + } if !math.IsNaN(resultExpected) { t.Fatalf("unexpected result; got %v; want %v", result, resultExpected) } + return } if result != resultExpected { t.Fatalf("unexpected result; got %v; want %v", result, resultExpected) @@ -52,6 +70,8 @@ func TestRoundToSignificantFigures(t *testing.T) { f(-0.56, 1, -0.6) f(1234567, 3, 1230000) f(-1.234567, 4, -1.235) + f(nan, 10, nan) + f(StaleNaN, 10, StaleNaN) } func TestPositiveFloatToDecimal(t *testing.T) { @@ -127,30 +147,49 @@ func TestAppendDecimalToFloat(t *testing.T) { testAppendDecimalToFloat(t, []int64{874957, 1130435}, -11, []float64{8.74957e-6, 1.130435e-5}) testAppendDecimalToFloat(t, []int64{874957, 1130435}, -12, []float64{8.74957e-7, 1.130435e-6}) testAppendDecimalToFloat(t, []int64{874957, 1130435}, -13, []float64{8.74957e-8, 1.130435e-7}) + testAppendDecimalToFloat(t, []int64{vMax, vMin, 1, 2}, 4, []float64{vMax * 1e4, vMin * 1e4, 1e4, 2e4}) + testAppendDecimalToFloat(t, []int64{vMax, vMin, 1, 2}, -4, []float64{vMax * 1e-4, vMin * 1e-4, 1e-4, 2e-4}) testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 0, []float64{infPos, infNeg, 1, 2}) testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 4, []float64{infPos, infNeg, 1e4, 2e4}) testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, -4, []float64{infPos, infNeg, 1e-4, 2e-4}) + testAppendDecimalToFloat(t, []int64{1234, vStaleNaN, 1, 2}, 0, []float64{1234, StaleNaN, 1, 2}) + testAppendDecimalToFloat(t, []int64{vInfPos, vStaleNaN, vMin, 2}, 4, []float64{infPos, StaleNaN, vMin * 1e4, 2e4}) + testAppendDecimalToFloat(t, []int64{vInfPos, vStaleNaN, vMin, 2}, -4, []float64{infPos, StaleNaN, vMin * 1e-4, 2e-4}) } func testAppendDecimalToFloat(t *testing.T, va []int64, e int16, fExpected []float64) { + t.Helper() f := AppendDecimalToFloat(nil, va, e) - if !reflect.DeepEqual(f, fExpected) { + if !equalValues(f, fExpected) { t.Fatalf("unexpected f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f, fExpected) } prefix := []float64{1, 2, 3, 4} f = AppendDecimalToFloat(prefix, va, e) - if !reflect.DeepEqual(f[:len(prefix)], prefix) { + if !equalValues(f[:len(prefix)], prefix) { t.Fatalf("unexpected prefix for va=%d, e=%d; got\n%v; expecting\n%v", va, e, f[:len(prefix)], prefix) } if fExpected == nil { fExpected = []float64{} } - if !reflect.DeepEqual(f[len(prefix):], fExpected) { + if !equalValues(f[len(prefix):], fExpected) { t.Fatalf("unexpected prefixed f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f[len(prefix):], fExpected) } } +func equalValues(a, b []float64) bool { + if len(a) != len(b) { + return false + } + for i, va := range a { + vb := b[i] + if math.Float64bits(va) != math.Float64bits(vb) { + return false + } + } + return true +} + func TestCalibrateScale(t *testing.T) { testCalibrateScale(t, []int64{}, []int64{}, 0, 0, []int64{}, []int64{}, 0) testCalibrateScale(t, []int64{0}, []int64{0}, 0, 0, []int64{0}, []int64{0}, 0) @@ -177,6 +216,7 @@ func TestCalibrateScale(t *testing.T) { testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 3, []int64{vMax, vMin, 123}, []int64{100e3}, 0) testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 3, 0, []int64{vMax, vMin, 123}, []int64{0}, 3) testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 30, []int64{92233, -92233, 0}, []int64{100e16}, 14) + testCalibrateScale(t, []int64{vStaleNaN, vMin, 123}, []int64{100}, 0, 30, []int64{vStaleNaN, -92233, 0}, []int64{100e16}, 14) // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805 testCalibrateScale(t, []int64{123}, []int64{vInfPos}, 0, 0, []int64{123}, []int64{vInfPos}, 0) @@ -242,6 +282,7 @@ func TestMaxUpExponent(t *testing.T) { f(vInfPos, 1024) f(vInfNeg, 1024) + f(vStaleNaN, 1024) f(vMin, 0) f(vMax, 0) f(0, 1024) @@ -328,6 +369,7 @@ func TestAppendFloatToDecimal(t *testing.T) { testAppendFloatToDecimal(t, []float64{0}, []int64{0}, 0) testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123}, []int64{vInfPos, vInfNeg, 123}, 0) testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123, 1e-4, 1e32}, []int64{vInfPos, vInfNeg, 0, 0, 1000000000000000000}, 14) + testAppendFloatToDecimal(t, []float64{StaleNaN, infNeg, 123, 1e-4, 1e32}, []int64{vStaleNaN, vInfNeg, 0, 0, 1000000000000000000}, 14) testAppendFloatToDecimal(t, []float64{0, -0, 1, -1, 12345678, -123456789}, []int64{0, 0, 1, -1, 12345678, -123456789}, 0) // upExp @@ -408,6 +450,7 @@ func TestFloatToDecimal(t *testing.T) { f(math.Inf(1), vInfPos, 0) f(math.Inf(-1), vInfNeg, 0) + f(StaleNaN, vStaleNaN, 0) f(vInfPos, 9223372036854775, 3) f(vInfNeg, -9223372036854775, 3) f(vMax, 9223372036854775, 3) @@ -459,6 +502,7 @@ func TestFloatToDecimalRoundtrip(t *testing.T) { f(infNeg) f(vMax) f(vMin) + f(vStaleNaN) for i := 0; i < 1e4; i++ { v := rand.NormFloat64() diff --git a/lib/persistentqueue/fastqueue.go b/lib/persistentqueue/fastqueue.go index 9c99f2b01..957e1ad71 100644 --- a/lib/persistentqueue/fastqueue.go +++ b/lib/persistentqueue/fastqueue.go @@ -31,7 +31,7 @@ type FastQueue struct { lastInmemoryBlockReadTime uint64 - mustStop bool + stopDeadline uint64 } // MustOpenFastQueue opens persistent queue at the given path. @@ -66,7 +66,9 @@ func (fq *FastQueue) UnblockAllReaders() { defer fq.mu.Unlock() // Unblock blocked readers - fq.mustStop = true + // Allow for up to 5 seconds for sending Prometheus stale markers. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 + fq.stopDeadline = fasttime.UnixTimestamp() + 5 fq.cond.Broadcast() } @@ -167,7 +169,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { defer fq.mu.Unlock() for { - if fq.mustStop { + if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline { return dst, false } if len(fq.ch) > 0 { @@ -189,7 +191,9 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) { dst = data continue } - + if fq.stopDeadline > 0 { + return dst, false + } // There are no blocks. Wait for new block. fq.pq.ResetIfEmpty() fq.cond.Wait() diff --git a/lib/persistentqueue/persistentqueue.go b/lib/persistentqueue/persistentqueue.go index 474c559a5..8d8212bb4 100644 --- a/lib/persistentqueue/persistentqueue.go +++ b/lib/persistentqueue/persistentqueue.go @@ -51,8 +51,6 @@ type queue struct { lastMetainfoFlushTime uint64 - mustStop bool - blocksDropped *metrics.Counter bytesDropped *metrics.Counter @@ -371,9 +369,6 @@ func (q *queue) MustWriteBlock(block []byte) { if uint64(len(block)) > q.maxBlockSize { logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize) } - if q.mustStop { - logger.Panicf("BUG: MustWriteBlock cannot be called after MustClose") - } if q.readerOffset > q.writerOffset { logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset) } diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 281a0e57e..1fff64ecf 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -10,6 +10,8 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" @@ -183,6 +185,9 @@ type scrapeWork struct { // prevLabelsLen contains the number labels scraped during the previous scrape. // It is used as a hint in order to reduce memory usage when parsing scrape responses. prevLabelsLen int + + activeSeriesBuf []byte + activeSeries [][]byte } func (sw *scrapeWork) run(stopCh <-chan struct{}) { @@ -233,6 +238,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) { timestamp += scrapeInterval.Milliseconds() select { case <-stopCh: + sw.sendStaleMarkers() return case tt := <-ticker.C: t := tt.UnixNano() / 1e6 @@ -315,9 +321,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) + sw.updateActiveSeries(wc) + sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) @@ -328,6 +333,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error return err } +func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) { + startTime := time.Now() + sw.PushData(wr) + pushDataDuration.UpdateDuration(startTime) +} + func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { samplesScraped := 0 samplesPostRelabeling := 0 @@ -357,9 +368,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { "either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit) } sw.updateSeriesAdded(wc) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) + sw.pushData(&wc.writeRequest) wc.resetNoRows() return nil }, sw.logError) @@ -385,9 +394,10 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error { sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp) sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp) - startTime := time.Now() - sw.PushData(&wc.writeRequest) - pushDataDuration.UpdateDuration(startTime) + // Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics. + // Do not track active series in streaming mode, since this may need too big amounts of memory + // when the target exports too big number of metrics. + sw.pushData(&wc.writeRequest) sw.prevLabelsLen = len(wc.labels) wc.reset() writeRequestCtxPool.Put(wc) @@ -475,6 +485,58 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) { } } +func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) { + b := sw.activeSeriesBuf[:0] + as := sw.activeSeries[:0] + for _, ts := range wc.writeRequest.Timeseries { + bLen := len(b) + for _, label := range ts.Labels { + b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Name)) + b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Value)) + } + as = append(as, b[bLen:]) + } + sw.activeSeriesBuf = b + sw.activeSeries = as +} + +func (sw *scrapeWork) sendStaleMarkers() { + series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries)) + staleMarkSamples := []prompbmarshal.Sample{ + { + Value: decimal.StaleNaN, + Timestamp: time.Now().UnixNano() / 1e6, + }, + } + for _, b := range sw.activeSeries { + var labels []prompbmarshal.Label + for len(b) > 0 { + tail, name, err := encoding.UnmarshalBytes(b) + if err != nil { + logger.Panicf("BUG: cannot unmarshal label name from activeSeries: %s", err) + } + b = tail + tail, value, err := encoding.UnmarshalBytes(b) + if err != nil { + logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err) + } + b = tail + labels = append(labels, prompbmarshal.Label{ + Name: bytesutil.ToUnsafeString(name), + Value: bytesutil.ToUnsafeString(value), + }) + } + series = append(series, prompbmarshal.TimeSeries{ + Labels: labels, + Samples: staleMarkSamples, + }) + } + wr := &prompbmarshal.WriteRequest{ + Timeseries: series, + } + sw.pushData(wr) +} + func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int { seriesAdded := sw.seriesAdded sw.seriesAdded = 0 diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9bb8a5d1a..af0cff3cc 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -18,6 +18,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" @@ -1696,9 +1697,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci for i := range mrs { mr := &mrs[i] if math.IsNaN(mr.Value) { - // Just skip NaNs, since the underlying encoding - // doesn't know how to work with them. - continue + if !decimal.IsStaleNaN(mr.Value) { + // Skip NaNs other than Prometheus staleness marker, since the underlying encoding + // doesn't know how to work with them. + continue + } } if mr.Timestamp < minTimestamp { // Skip rows with too small timestamps outside the retention.