mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
all: add support for Prometheus staleness markers
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1509 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1530 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845
This commit is contained in:
parent
7feb62eea9
commit
c1f81f08d4
8 changed files with 270 additions and 88 deletions
|
@ -271,16 +271,16 @@ func getRollupConfigs(name string, rf rollupFunc, expr metricsql.Expr, start, en
|
||||||
}
|
}
|
||||||
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
newRollupConfig := func(rf rollupFunc, tagValue string) *rollupConfig {
|
||||||
return &rollupConfig{
|
return &rollupConfig{
|
||||||
TagValue: tagValue,
|
TagValue: tagValue,
|
||||||
Func: rf,
|
Func: rf,
|
||||||
Start: start,
|
Start: start,
|
||||||
End: end,
|
End: end,
|
||||||
Step: step,
|
Step: step,
|
||||||
Window: window,
|
Window: window,
|
||||||
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
|
MayAdjustWindow: !rollupFuncsCannotAdjustWindow[name],
|
||||||
CanDropLastSample: name == "default_rollup",
|
CanDropStalePoints: name == "default_rollup",
|
||||||
LookbackDelta: lookbackDelta,
|
LookbackDelta: lookbackDelta,
|
||||||
Timestamps: sharedTimestamps,
|
Timestamps: sharedTimestamps,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
appendRollupConfigs := func(dst []*rollupConfig) []*rollupConfig {
|
||||||
|
@ -402,10 +402,10 @@ type rollupConfig struct {
|
||||||
// when using window smaller than 2 x scrape_interval.
|
// when using window smaller than 2 x scrape_interval.
|
||||||
MayAdjustWindow bool
|
MayAdjustWindow bool
|
||||||
|
|
||||||
// Whether the last sample can be dropped during rollup calculations.
|
// Whether points after Prometheus stale marks can be dropped during rollup calculations.
|
||||||
// The last sample can be dropped for `default_rollup()` function only.
|
// Stale points can be dropped only if `default_rollup()` function is used.
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748 .
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
|
||||||
CanDropLastSample bool
|
CanDropStalePoints bool
|
||||||
|
|
||||||
Timestamps []int64
|
Timestamps []int64
|
||||||
|
|
||||||
|
@ -506,6 +506,10 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||||
// Extend dstValues in order to remove mallocs below.
|
// Extend dstValues in order to remove mallocs below.
|
||||||
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
dstValues = decimal.ExtendFloat64sCapacity(dstValues, len(rc.Timestamps))
|
||||||
|
|
||||||
|
if !rc.CanDropStalePoints {
|
||||||
|
// Remove Prometheus staleness marks from values, so rollup functions don't hit NaN values.
|
||||||
|
values, timestamps = dropStaleNaNs(values, timestamps)
|
||||||
|
}
|
||||||
scrapeInterval := getScrapeInterval(timestamps)
|
scrapeInterval := getScrapeInterval(timestamps)
|
||||||
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
|
maxPrevInterval := getMaxPrevInterval(scrapeInterval)
|
||||||
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||||
|
@ -519,7 +523,7 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||||
window := rc.Window
|
window := rc.Window
|
||||||
if window <= 0 {
|
if window <= 0 {
|
||||||
window = rc.Step
|
window = rc.Step
|
||||||
if rc.CanDropLastSample && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
|
if rc.CanDropStalePoints && rc.LookbackDelta > 0 && window > rc.LookbackDelta {
|
||||||
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
|
// Implicit window exceeds -search.maxStalenessInterval, so limit it to -search.maxStalenessInterval
|
||||||
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
|
// according to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/784
|
||||||
window = rc.LookbackDelta
|
window = rc.LookbackDelta
|
||||||
|
@ -537,10 +541,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||||
j := 0
|
j := 0
|
||||||
ni := 0
|
ni := 0
|
||||||
nj := 0
|
nj := 0
|
||||||
stalenessInterval := int64(float64(scrapeInterval) * 0.9)
|
|
||||||
// Do not drop trailing data points for queries, which return 2 or 1 point (aka instant queries).
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/845
|
|
||||||
canDropLastSample := rc.CanDropLastSample && len(rc.Timestamps) > 2
|
|
||||||
f := rc.Func
|
f := rc.Func
|
||||||
for _, tEnd := range rc.Timestamps {
|
for _, tEnd := range rc.Timestamps {
|
||||||
tStart := tEnd - window
|
tStart := tEnd - window
|
||||||
|
@ -560,16 +560,6 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||||
}
|
}
|
||||||
rfa.values = values[i:j]
|
rfa.values = values[i:j]
|
||||||
rfa.timestamps = timestamps[i:j]
|
rfa.timestamps = timestamps[i:j]
|
||||||
if canDropLastSample && j == len(timestamps) && j > 0 && (tEnd-timestamps[j-1] > stalenessInterval || i == j && len(timestamps) == 1) {
|
|
||||||
// Drop trailing data points in the following cases:
|
|
||||||
// - if the distance between the last raw sample and tEnd exceeds stalenessInterval
|
|
||||||
// - if time series contains only a single raw sample
|
|
||||||
// This should prevent from double counting when a label changes in time series (for instance,
|
|
||||||
// during new deployment in K8S). See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/748
|
|
||||||
rfa.prevValue = nan
|
|
||||||
rfa.values = nil
|
|
||||||
rfa.timestamps = nil
|
|
||||||
}
|
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
rfa.realPrevValue = values[i-1]
|
rfa.realPrevValue = values[i-1]
|
||||||
} else {
|
} else {
|
||||||
|
@ -590,6 +580,31 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||||
return dstValues
|
return dstValues
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dropStaleNaNs(values []float64, timestamps []int64) ([]float64, []int64) {
|
||||||
|
hasStaleSamples := false
|
||||||
|
for _, v := range values {
|
||||||
|
if decimal.IsStaleNaN(v) {
|
||||||
|
hasStaleSamples = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasStaleSamples {
|
||||||
|
// Fast path: values have noe Prometheus staleness marks.
|
||||||
|
return values, timestamps
|
||||||
|
}
|
||||||
|
// Slow path: drop Prometheus staleness marks from values.
|
||||||
|
dstValues := make([]float64, 0, len(values))
|
||||||
|
dstTimestamps := make([]int64, 0, len(timestamps))
|
||||||
|
for i, v := range values {
|
||||||
|
if decimal.IsStaleNaN(v) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dstValues = append(dstValues, v)
|
||||||
|
dstTimestamps = append(dstTimestamps, timestamps[i])
|
||||||
|
}
|
||||||
|
return dstValues, dstTimestamps
|
||||||
|
}
|
||||||
|
|
||||||
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
|
func seekFirstTimestampIdxAfter(timestamps []int64, seekTimestamp int64, nHint int) int {
|
||||||
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
|
if len(timestamps) == 0 || timestamps[0] > seekTimestamp {
|
||||||
return 0
|
return 0
|
||||||
|
@ -1826,11 +1841,20 @@ func rollupFirst(rfa *rollupFuncArg) float64 {
|
||||||
return values[0]
|
return values[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
var rollupDefault = rollupLast
|
func rollupDefault(rfa *rollupFuncArg) float64 {
|
||||||
|
values := rfa.values
|
||||||
|
if len(values) == 0 {
|
||||||
|
// Do not take into account rfa.prevValue, since it may lead
|
||||||
|
// to inconsistent results comparing to Prometheus on broken time series
|
||||||
|
// with irregular data points.
|
||||||
|
return nan
|
||||||
|
}
|
||||||
|
// Intentionally do not skip the possible last Prometheus staleness mark.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526 .
|
||||||
|
return values[len(values)-1]
|
||||||
|
}
|
||||||
|
|
||||||
func rollupLast(rfa *rollupFuncArg) float64 {
|
func rollupLast(rfa *rollupFuncArg) float64 {
|
||||||
// There is no need in handling NaNs here, since they must be cleaned up
|
|
||||||
// before calling rollup funcs.
|
|
||||||
values := rfa.values
|
values := rfa.values
|
||||||
if len(values) == 0 {
|
if len(values) == 0 {
|
||||||
// Do not take into account rfa.prevValue, since it may lead
|
// Do not take into account rfa.prevValue, since it may lead
|
||||||
|
|
|
@ -6,6 +6,8 @@ sort: 15
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: add support for Prometheus staleness markers. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526).
|
||||||
|
* FEATURE: vmagent: automatically generate Prometheus staleness markers for the scraped metrics when scrape targets disappear in the same way as Prometheus does. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526).
|
||||||
* FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
* FEATURE: add `present_over_time(m[d])` function, which returns 1 if `m` has a least a single sample over the previous duration `d`. This function has been added also to [Prometheus 2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
||||||
* FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491).
|
* FEATURE: vmagent: support multitenant writes according to [these docs](https://docs.victoriametrics.com/vmagent.html#multitenancy). This allows using a single `vmagent` instance in front of VictoriaMetrics cluster for all the tenants. Thanks to @omarghader for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1505). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1491).
|
||||||
* FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
* FEATURE: vmagent: add `__meta_ec2_availability_zone_id` label to discovered Amazon EC2 targets. This label is available in Prometheus [starting from v2.29](https://github.com/prometheus/prometheus/releases/tag/v2.29.0-rc.0).
|
||||||
|
|
|
@ -36,7 +36,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
|
||||||
}
|
}
|
||||||
upExp -= downExp
|
upExp -= downExp
|
||||||
for i, v := range a {
|
for i, v := range a {
|
||||||
if v == vInfPos || v == vInfNeg {
|
if isSpecialValue(v) {
|
||||||
|
// Do not take into account special values.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
adjExp := upExp
|
adjExp := upExp
|
||||||
|
@ -48,7 +49,8 @@ func CalibrateScale(a []int64, ae int16, b []int64, be int16) (e int16) {
|
||||||
}
|
}
|
||||||
if downExp > 0 {
|
if downExp > 0 {
|
||||||
for i, v := range b {
|
for i, v := range b {
|
||||||
if v == vInfPos || v == vInfNeg {
|
if isSpecialValue(v) {
|
||||||
|
// Do not take into account special values.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
adjExp := downExp
|
adjExp := downExp
|
||||||
|
@ -106,13 +108,17 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 {
|
||||||
}
|
}
|
||||||
_ = a[len(va)-1]
|
_ = a[len(va)-1]
|
||||||
for i, v := range va {
|
for i, v := range va {
|
||||||
f := float64(v)
|
a[i] = float64(v)
|
||||||
if v == vInfPos {
|
if !isSpecialValue(v) {
|
||||||
f = infPos
|
continue
|
||||||
} else if v == vInfNeg {
|
}
|
||||||
f = infNeg
|
if v == vInfPos {
|
||||||
|
a[i] = infPos
|
||||||
|
} else if v == vInfNeg {
|
||||||
|
a[i] = infNeg
|
||||||
|
} else {
|
||||||
|
a[i] = StaleNaN
|
||||||
}
|
}
|
||||||
a[i] = f
|
|
||||||
}
|
}
|
||||||
return dst[:len(dst)+len(va)]
|
return dst[:len(dst)+len(va)]
|
||||||
}
|
}
|
||||||
|
@ -122,26 +128,34 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 {
|
||||||
e10 := math.Pow10(int(-e))
|
e10 := math.Pow10(int(-e))
|
||||||
_ = a[len(va)-1]
|
_ = a[len(va)-1]
|
||||||
for i, v := range va {
|
for i, v := range va {
|
||||||
f := float64(v) / e10
|
a[i] = float64(v) / e10
|
||||||
if v == vInfPos {
|
if !isSpecialValue(v) {
|
||||||
f = infPos
|
continue
|
||||||
} else if v == vInfNeg {
|
}
|
||||||
f = infNeg
|
if v == vInfPos {
|
||||||
|
a[i] = infPos
|
||||||
|
} else if v == vInfNeg {
|
||||||
|
a[i] = infNeg
|
||||||
|
} else {
|
||||||
|
a[i] = StaleNaN
|
||||||
}
|
}
|
||||||
a[i] = f
|
|
||||||
}
|
}
|
||||||
return dst[:len(dst)+len(va)]
|
return dst[:len(dst)+len(va)]
|
||||||
}
|
}
|
||||||
e10 := math.Pow10(int(e))
|
e10 := math.Pow10(int(e))
|
||||||
_ = a[len(va)-1]
|
_ = a[len(va)-1]
|
||||||
for i, v := range va {
|
for i, v := range va {
|
||||||
f := float64(v) * e10
|
a[i] = float64(v) * e10
|
||||||
if v == vInfPos {
|
if !isSpecialValue(v) {
|
||||||
f = infPos
|
continue
|
||||||
} else if v == vInfNeg {
|
}
|
||||||
f = infNeg
|
if v == vInfPos {
|
||||||
|
a[i] = infPos
|
||||||
|
} else if v == vInfNeg {
|
||||||
|
a[i] = infNeg
|
||||||
|
} else {
|
||||||
|
a[i] = StaleNaN
|
||||||
}
|
}
|
||||||
a[i] = f
|
|
||||||
}
|
}
|
||||||
return dst[:len(dst)+len(va)]
|
return dst[:len(dst)+len(va)]
|
||||||
}
|
}
|
||||||
|
@ -184,7 +198,7 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
|
||||||
v, exp := FromFloat(f)
|
v, exp := FromFloat(f)
|
||||||
va[i] = v
|
va[i] = v
|
||||||
ea[i] = exp
|
ea[i] = exp
|
||||||
if exp < minExp && v != vInfPos && v != vInfNeg {
|
if exp < minExp && !isSpecialValue(v) {
|
||||||
minExp = exp
|
minExp = exp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -211,7 +225,8 @@ func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) {
|
||||||
_ = a[len(va)-1]
|
_ = a[len(va)-1]
|
||||||
_ = ea[len(va)-1]
|
_ = ea[len(va)-1]
|
||||||
for i, v := range va {
|
for i, v := range va {
|
||||||
if v == vInfPos || v == vInfNeg {
|
if isSpecialValue(v) {
|
||||||
|
// There is no need in scaling special values.
|
||||||
a[i] = v
|
a[i] = v
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -245,8 +260,8 @@ var vaeBufPool sync.Pool
|
||||||
const int64Max = int64(1<<63 - 1)
|
const int64Max = int64(1<<63 - 1)
|
||||||
|
|
||||||
func maxUpExponent(v int64) int16 {
|
func maxUpExponent(v int64) int16 {
|
||||||
if v == 0 || v == vInfPos || v == vInfNeg {
|
if v == 0 || isSpecialValue(v) {
|
||||||
// Any exponent allowed.
|
// Any exponent allowed for zeroes and special values.
|
||||||
return 1024
|
return 1024
|
||||||
}
|
}
|
||||||
if v < 0 {
|
if v < 0 {
|
||||||
|
@ -302,6 +317,10 @@ func maxUpExponent(v int64) int16 {
|
||||||
//
|
//
|
||||||
// See also RoundToSignificantFigures.
|
// See also RoundToSignificantFigures.
|
||||||
func RoundToDecimalDigits(f float64, digits int) float64 {
|
func RoundToDecimalDigits(f float64, digits int) float64 {
|
||||||
|
if IsStaleNaN(f) {
|
||||||
|
// Do not modify stale nan mark value.
|
||||||
|
return f
|
||||||
|
}
|
||||||
if digits <= -100 || digits >= 100 {
|
if digits <= -100 || digits >= 100 {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
@ -313,6 +332,10 @@ func RoundToDecimalDigits(f float64, digits int) float64 {
|
||||||
//
|
//
|
||||||
// See also RoundToDecimalDigits.
|
// See also RoundToDecimalDigits.
|
||||||
func RoundToSignificantFigures(f float64, digits int) float64 {
|
func RoundToSignificantFigures(f float64, digits int) float64 {
|
||||||
|
if IsStaleNaN(f) {
|
||||||
|
// Do not modify stale nan mark value.
|
||||||
|
return f
|
||||||
|
}
|
||||||
if digits <= 0 || digits >= 18 {
|
if digits <= 0 || digits >= 18 {
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
@ -345,11 +368,14 @@ func RoundToSignificantFigures(f float64, digits int) float64 {
|
||||||
|
|
||||||
// ToFloat returns f=v*10^e.
|
// ToFloat returns f=v*10^e.
|
||||||
func ToFloat(v int64, e int16) float64 {
|
func ToFloat(v int64, e int16) float64 {
|
||||||
if v == vInfPos {
|
if isSpecialValue(v) {
|
||||||
return infPos
|
if v == vInfPos {
|
||||||
}
|
return infPos
|
||||||
if v == vInfNeg {
|
}
|
||||||
return infNeg
|
if v == vInfNeg {
|
||||||
|
return infNeg
|
||||||
|
}
|
||||||
|
return StaleNaN
|
||||||
}
|
}
|
||||||
f := float64(v)
|
f := float64(v)
|
||||||
// increase conversion precision for negative exponents by dividing by e10
|
// increase conversion precision for negative exponents by dividing by e10
|
||||||
|
@ -364,24 +390,46 @@ var (
|
||||||
infNeg = math.Inf(-1)
|
infNeg = math.Inf(-1)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// StaleNaN is a special NaN value, which is used as Prometheus staleness mark.
|
||||||
|
// See https://www.robustperception.io/staleness-and-promql
|
||||||
|
var StaleNaN = math.Float64frombits(staleNaNBits)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
vInfPos = 1<<63 - 1
|
vInfPos = 1<<63 - 1
|
||||||
vInfNeg = -1 << 63
|
vInfNeg = -1 << 63
|
||||||
|
vStaleNaN = 1<<63 - 2
|
||||||
|
|
||||||
vMax = 1<<63 - 3
|
vMax = 1<<63 - 3
|
||||||
vMin = -1<<63 + 1
|
vMin = -1<<63 + 1
|
||||||
|
|
||||||
|
// staleNaNbits is bit representation of Prometheus staleness mark (aka stale NaN).
|
||||||
|
// This mark is put by Prometheus at the end of time series for improving staleness detection.
|
||||||
|
// See https://www.robustperception.io/staleness-and-promql
|
||||||
|
staleNaNBits uint64 = 0x7ff0000000000002
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func isSpecialValue(v int64) bool {
|
||||||
|
return v > vMax || v < vMin
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsStaleNaN returns true if f represents Prometheus staleness mark.
|
||||||
|
func IsStaleNaN(f float64) bool {
|
||||||
|
return math.Float64bits(f) == staleNaNBits
|
||||||
|
}
|
||||||
|
|
||||||
// FromFloat converts f to v*10^e.
|
// FromFloat converts f to v*10^e.
|
||||||
//
|
//
|
||||||
// It tries minimizing v.
|
// It tries minimizing v.
|
||||||
// For instance, for f = -1.234 it returns v = -1234, e = -3.
|
// For instance, for f = -1.234 it returns v = -1234, e = -3.
|
||||||
//
|
//
|
||||||
// FromFloat doesn't work properly with NaN values, so don't pass them here.
|
// FromFloat doesn't work properly with NaN values other than Prometheus staleness mark, so don't pass them here.
|
||||||
func FromFloat(f float64) (int64, int16) {
|
func FromFloat(f float64) (int64, int16) {
|
||||||
if f == 0 {
|
if f == 0 {
|
||||||
return 0, 0
|
return 0, 0
|
||||||
}
|
}
|
||||||
|
if IsStaleNaN(f) {
|
||||||
|
return vStaleNaN, 0
|
||||||
|
}
|
||||||
if math.IsInf(f, 0) {
|
if math.IsInf(f, 0) {
|
||||||
return fromFloatInf(f)
|
return fromFloatInf(f)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,9 +12,16 @@ func TestRoundToDecimalDigits(t *testing.T) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
result := RoundToDecimalDigits(f, digits)
|
result := RoundToDecimalDigits(f, digits)
|
||||||
if math.IsNaN(result) {
|
if math.IsNaN(result) {
|
||||||
|
if IsStaleNaN(resultExpected) {
|
||||||
|
if !IsStaleNaN(result) {
|
||||||
|
t.Fatalf("unexpected stale mark value; got %016X; want %016X", math.Float64bits(result), staleNaNBits)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
if !math.IsNaN(resultExpected) {
|
if !math.IsNaN(resultExpected) {
|
||||||
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if result != resultExpected {
|
if result != resultExpected {
|
||||||
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
||||||
|
@ -29,16 +36,27 @@ func TestRoundToDecimalDigits(t *testing.T) {
|
||||||
f(1234, 0, 1234)
|
f(1234, 0, 1234)
|
||||||
f(1234.6, 0, 1235)
|
f(1234.6, 0, 1235)
|
||||||
f(123.4e-99, 99, 123e-99)
|
f(123.4e-99, 99, 123e-99)
|
||||||
|
f(nan, 10, nan)
|
||||||
|
f(StaleNaN, 10, StaleNaN)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nan = math.NaN()
|
||||||
|
|
||||||
func TestRoundToSignificantFigures(t *testing.T) {
|
func TestRoundToSignificantFigures(t *testing.T) {
|
||||||
f := func(f float64, digits int, resultExpected float64) {
|
f := func(f float64, digits int, resultExpected float64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
result := RoundToSignificantFigures(f, digits)
|
result := RoundToSignificantFigures(f, digits)
|
||||||
if math.IsNaN(result) {
|
if math.IsNaN(result) {
|
||||||
|
if IsStaleNaN(resultExpected) {
|
||||||
|
if !IsStaleNaN(result) {
|
||||||
|
t.Fatalf("unexpected stale mark value; got %016X; want %016X", math.Float64bits(result), staleNaNBits)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
if !math.IsNaN(resultExpected) {
|
if !math.IsNaN(resultExpected) {
|
||||||
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
||||||
}
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if result != resultExpected {
|
if result != resultExpected {
|
||||||
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
t.Fatalf("unexpected result; got %v; want %v", result, resultExpected)
|
||||||
|
@ -52,6 +70,8 @@ func TestRoundToSignificantFigures(t *testing.T) {
|
||||||
f(-0.56, 1, -0.6)
|
f(-0.56, 1, -0.6)
|
||||||
f(1234567, 3, 1230000)
|
f(1234567, 3, 1230000)
|
||||||
f(-1.234567, 4, -1.235)
|
f(-1.234567, 4, -1.235)
|
||||||
|
f(nan, 10, nan)
|
||||||
|
f(StaleNaN, 10, StaleNaN)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPositiveFloatToDecimal(t *testing.T) {
|
func TestPositiveFloatToDecimal(t *testing.T) {
|
||||||
|
@ -127,30 +147,49 @@ func TestAppendDecimalToFloat(t *testing.T) {
|
||||||
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -11, []float64{8.74957e-6, 1.130435e-5})
|
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -11, []float64{8.74957e-6, 1.130435e-5})
|
||||||
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -12, []float64{8.74957e-7, 1.130435e-6})
|
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -12, []float64{8.74957e-7, 1.130435e-6})
|
||||||
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -13, []float64{8.74957e-8, 1.130435e-7})
|
testAppendDecimalToFloat(t, []int64{874957, 1130435}, -13, []float64{8.74957e-8, 1.130435e-7})
|
||||||
|
testAppendDecimalToFloat(t, []int64{vMax, vMin, 1, 2}, 4, []float64{vMax * 1e4, vMin * 1e4, 1e4, 2e4})
|
||||||
|
testAppendDecimalToFloat(t, []int64{vMax, vMin, 1, 2}, -4, []float64{vMax * 1e-4, vMin * 1e-4, 1e-4, 2e-4})
|
||||||
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 0, []float64{infPos, infNeg, 1, 2})
|
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 0, []float64{infPos, infNeg, 1, 2})
|
||||||
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 4, []float64{infPos, infNeg, 1e4, 2e4})
|
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 4, []float64{infPos, infNeg, 1e4, 2e4})
|
||||||
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, -4, []float64{infPos, infNeg, 1e-4, 2e-4})
|
testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, -4, []float64{infPos, infNeg, 1e-4, 2e-4})
|
||||||
|
testAppendDecimalToFloat(t, []int64{1234, vStaleNaN, 1, 2}, 0, []float64{1234, StaleNaN, 1, 2})
|
||||||
|
testAppendDecimalToFloat(t, []int64{vInfPos, vStaleNaN, vMin, 2}, 4, []float64{infPos, StaleNaN, vMin * 1e4, 2e4})
|
||||||
|
testAppendDecimalToFloat(t, []int64{vInfPos, vStaleNaN, vMin, 2}, -4, []float64{infPos, StaleNaN, vMin * 1e-4, 2e-4})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAppendDecimalToFloat(t *testing.T, va []int64, e int16, fExpected []float64) {
|
func testAppendDecimalToFloat(t *testing.T, va []int64, e int16, fExpected []float64) {
|
||||||
|
t.Helper()
|
||||||
f := AppendDecimalToFloat(nil, va, e)
|
f := AppendDecimalToFloat(nil, va, e)
|
||||||
if !reflect.DeepEqual(f, fExpected) {
|
if !equalValues(f, fExpected) {
|
||||||
t.Fatalf("unexpected f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f, fExpected)
|
t.Fatalf("unexpected f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f, fExpected)
|
||||||
}
|
}
|
||||||
|
|
||||||
prefix := []float64{1, 2, 3, 4}
|
prefix := []float64{1, 2, 3, 4}
|
||||||
f = AppendDecimalToFloat(prefix, va, e)
|
f = AppendDecimalToFloat(prefix, va, e)
|
||||||
if !reflect.DeepEqual(f[:len(prefix)], prefix) {
|
if !equalValues(f[:len(prefix)], prefix) {
|
||||||
t.Fatalf("unexpected prefix for va=%d, e=%d; got\n%v; expecting\n%v", va, e, f[:len(prefix)], prefix)
|
t.Fatalf("unexpected prefix for va=%d, e=%d; got\n%v; expecting\n%v", va, e, f[:len(prefix)], prefix)
|
||||||
}
|
}
|
||||||
if fExpected == nil {
|
if fExpected == nil {
|
||||||
fExpected = []float64{}
|
fExpected = []float64{}
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(f[len(prefix):], fExpected) {
|
if !equalValues(f[len(prefix):], fExpected) {
|
||||||
t.Fatalf("unexpected prefixed f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f[len(prefix):], fExpected)
|
t.Fatalf("unexpected prefixed f for va=%d, e=%d: got\n%v; expecting\n%v", va, e, f[len(prefix):], fExpected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func equalValues(a, b []float64) bool {
|
||||||
|
if len(a) != len(b) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for i, va := range a {
|
||||||
|
vb := b[i]
|
||||||
|
if math.Float64bits(va) != math.Float64bits(vb) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func TestCalibrateScale(t *testing.T) {
|
func TestCalibrateScale(t *testing.T) {
|
||||||
testCalibrateScale(t, []int64{}, []int64{}, 0, 0, []int64{}, []int64{}, 0)
|
testCalibrateScale(t, []int64{}, []int64{}, 0, 0, []int64{}, []int64{}, 0)
|
||||||
testCalibrateScale(t, []int64{0}, []int64{0}, 0, 0, []int64{0}, []int64{0}, 0)
|
testCalibrateScale(t, []int64{0}, []int64{0}, 0, 0, []int64{0}, []int64{0}, 0)
|
||||||
|
@ -177,6 +216,7 @@ func TestCalibrateScale(t *testing.T) {
|
||||||
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 3, []int64{vMax, vMin, 123}, []int64{100e3}, 0)
|
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 3, []int64{vMax, vMin, 123}, []int64{100e3}, 0)
|
||||||
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 3, 0, []int64{vMax, vMin, 123}, []int64{0}, 3)
|
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 3, 0, []int64{vMax, vMin, 123}, []int64{0}, 3)
|
||||||
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 30, []int64{92233, -92233, 0}, []int64{100e16}, 14)
|
testCalibrateScale(t, []int64{vMax, vMin, 123}, []int64{100}, 0, 30, []int64{92233, -92233, 0}, []int64{100e16}, 14)
|
||||||
|
testCalibrateScale(t, []int64{vStaleNaN, vMin, 123}, []int64{100}, 0, 30, []int64{vStaleNaN, -92233, 0}, []int64{100e16}, 14)
|
||||||
|
|
||||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/805
|
||||||
testCalibrateScale(t, []int64{123}, []int64{vInfPos}, 0, 0, []int64{123}, []int64{vInfPos}, 0)
|
testCalibrateScale(t, []int64{123}, []int64{vInfPos}, 0, 0, []int64{123}, []int64{vInfPos}, 0)
|
||||||
|
@ -242,6 +282,7 @@ func TestMaxUpExponent(t *testing.T) {
|
||||||
|
|
||||||
f(vInfPos, 1024)
|
f(vInfPos, 1024)
|
||||||
f(vInfNeg, 1024)
|
f(vInfNeg, 1024)
|
||||||
|
f(vStaleNaN, 1024)
|
||||||
f(vMin, 0)
|
f(vMin, 0)
|
||||||
f(vMax, 0)
|
f(vMax, 0)
|
||||||
f(0, 1024)
|
f(0, 1024)
|
||||||
|
@ -328,6 +369,7 @@ func TestAppendFloatToDecimal(t *testing.T) {
|
||||||
testAppendFloatToDecimal(t, []float64{0}, []int64{0}, 0)
|
testAppendFloatToDecimal(t, []float64{0}, []int64{0}, 0)
|
||||||
testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123}, []int64{vInfPos, vInfNeg, 123}, 0)
|
testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123}, []int64{vInfPos, vInfNeg, 123}, 0)
|
||||||
testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123, 1e-4, 1e32}, []int64{vInfPos, vInfNeg, 0, 0, 1000000000000000000}, 14)
|
testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123, 1e-4, 1e32}, []int64{vInfPos, vInfNeg, 0, 0, 1000000000000000000}, 14)
|
||||||
|
testAppendFloatToDecimal(t, []float64{StaleNaN, infNeg, 123, 1e-4, 1e32}, []int64{vStaleNaN, vInfNeg, 0, 0, 1000000000000000000}, 14)
|
||||||
testAppendFloatToDecimal(t, []float64{0, -0, 1, -1, 12345678, -123456789}, []int64{0, 0, 1, -1, 12345678, -123456789}, 0)
|
testAppendFloatToDecimal(t, []float64{0, -0, 1, -1, 12345678, -123456789}, []int64{0, 0, 1, -1, 12345678, -123456789}, 0)
|
||||||
|
|
||||||
// upExp
|
// upExp
|
||||||
|
@ -408,6 +450,7 @@ func TestFloatToDecimal(t *testing.T) {
|
||||||
|
|
||||||
f(math.Inf(1), vInfPos, 0)
|
f(math.Inf(1), vInfPos, 0)
|
||||||
f(math.Inf(-1), vInfNeg, 0)
|
f(math.Inf(-1), vInfNeg, 0)
|
||||||
|
f(StaleNaN, vStaleNaN, 0)
|
||||||
f(vInfPos, 9223372036854775, 3)
|
f(vInfPos, 9223372036854775, 3)
|
||||||
f(vInfNeg, -9223372036854775, 3)
|
f(vInfNeg, -9223372036854775, 3)
|
||||||
f(vMax, 9223372036854775, 3)
|
f(vMax, 9223372036854775, 3)
|
||||||
|
@ -459,6 +502,7 @@ func TestFloatToDecimalRoundtrip(t *testing.T) {
|
||||||
f(infNeg)
|
f(infNeg)
|
||||||
f(vMax)
|
f(vMax)
|
||||||
f(vMin)
|
f(vMin)
|
||||||
|
f(vStaleNaN)
|
||||||
|
|
||||||
for i := 0; i < 1e4; i++ {
|
for i := 0; i < 1e4; i++ {
|
||||||
v := rand.NormFloat64()
|
v := rand.NormFloat64()
|
||||||
|
|
|
@ -31,7 +31,7 @@ type FastQueue struct {
|
||||||
|
|
||||||
lastInmemoryBlockReadTime uint64
|
lastInmemoryBlockReadTime uint64
|
||||||
|
|
||||||
mustStop bool
|
stopDeadline uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// MustOpenFastQueue opens persistent queue at the given path.
|
// MustOpenFastQueue opens persistent queue at the given path.
|
||||||
|
@ -66,7 +66,9 @@ func (fq *FastQueue) UnblockAllReaders() {
|
||||||
defer fq.mu.Unlock()
|
defer fq.mu.Unlock()
|
||||||
|
|
||||||
// Unblock blocked readers
|
// Unblock blocked readers
|
||||||
fq.mustStop = true
|
// Allow for up to 5 seconds for sending Prometheus stale markers.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1526
|
||||||
|
fq.stopDeadline = fasttime.UnixTimestamp() + 5
|
||||||
fq.cond.Broadcast()
|
fq.cond.Broadcast()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,7 +169,7 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||||
defer fq.mu.Unlock()
|
defer fq.mu.Unlock()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if fq.mustStop {
|
if fq.stopDeadline > 0 && fasttime.UnixTimestamp() > fq.stopDeadline {
|
||||||
return dst, false
|
return dst, false
|
||||||
}
|
}
|
||||||
if len(fq.ch) > 0 {
|
if len(fq.ch) > 0 {
|
||||||
|
@ -189,7 +191,9 @@ func (fq *FastQueue) MustReadBlock(dst []byte) ([]byte, bool) {
|
||||||
dst = data
|
dst = data
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if fq.stopDeadline > 0 {
|
||||||
|
return dst, false
|
||||||
|
}
|
||||||
// There are no blocks. Wait for new block.
|
// There are no blocks. Wait for new block.
|
||||||
fq.pq.ResetIfEmpty()
|
fq.pq.ResetIfEmpty()
|
||||||
fq.cond.Wait()
|
fq.cond.Wait()
|
||||||
|
|
|
@ -51,8 +51,6 @@ type queue struct {
|
||||||
|
|
||||||
lastMetainfoFlushTime uint64
|
lastMetainfoFlushTime uint64
|
||||||
|
|
||||||
mustStop bool
|
|
||||||
|
|
||||||
blocksDropped *metrics.Counter
|
blocksDropped *metrics.Counter
|
||||||
bytesDropped *metrics.Counter
|
bytesDropped *metrics.Counter
|
||||||
|
|
||||||
|
@ -371,9 +369,6 @@ func (q *queue) MustWriteBlock(block []byte) {
|
||||||
if uint64(len(block)) > q.maxBlockSize {
|
if uint64(len(block)) > q.maxBlockSize {
|
||||||
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
|
logger.Panicf("BUG: too big block to send: %d bytes; it mustn't exceed %d bytes", len(block), q.maxBlockSize)
|
||||||
}
|
}
|
||||||
if q.mustStop {
|
|
||||||
logger.Panicf("BUG: MustWriteBlock cannot be called after MustClose")
|
|
||||||
}
|
|
||||||
if q.readerOffset > q.writerOffset {
|
if q.readerOffset > q.writerOffset {
|
||||||
logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
logger.Panicf("BUG: readerOffset=%d shouldn't exceed writerOffset=%d", q.readerOffset, q.writerOffset)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/leveledbytebufferpool"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||||
|
@ -183,6 +185,9 @@ type scrapeWork struct {
|
||||||
// prevLabelsLen contains the number labels scraped during the previous scrape.
|
// prevLabelsLen contains the number labels scraped during the previous scrape.
|
||||||
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
|
// It is used as a hint in order to reduce memory usage when parsing scrape responses.
|
||||||
prevLabelsLen int
|
prevLabelsLen int
|
||||||
|
|
||||||
|
activeSeriesBuf []byte
|
||||||
|
activeSeries [][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
||||||
|
@ -233,6 +238,7 @@ func (sw *scrapeWork) run(stopCh <-chan struct{}) {
|
||||||
timestamp += scrapeInterval.Milliseconds()
|
timestamp += scrapeInterval.Milliseconds()
|
||||||
select {
|
select {
|
||||||
case <-stopCh:
|
case <-stopCh:
|
||||||
|
sw.sendStaleMarkers()
|
||||||
return
|
return
|
||||||
case tt := <-ticker.C:
|
case tt := <-ticker.C:
|
||||||
t := tt.UnixNano() / 1e6
|
t := tt.UnixNano() / 1e6
|
||||||
|
@ -315,9 +321,8 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||||
startTime := time.Now()
|
sw.updateActiveSeries(wc)
|
||||||
sw.PushData(&wc.writeRequest)
|
sw.pushData(&wc.writeRequest)
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
|
||||||
sw.prevLabelsLen = len(wc.labels)
|
sw.prevLabelsLen = len(wc.labels)
|
||||||
wc.reset()
|
wc.reset()
|
||||||
writeRequestCtxPool.Put(wc)
|
writeRequestCtxPool.Put(wc)
|
||||||
|
@ -328,6 +333,12 @@ func (sw *scrapeWork) scrapeInternal(scrapeTimestamp, realTimestamp int64) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sw *scrapeWork) pushData(wr *prompbmarshal.WriteRequest) {
|
||||||
|
startTime := time.Now()
|
||||||
|
sw.PushData(wr)
|
||||||
|
pushDataDuration.UpdateDuration(startTime)
|
||||||
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
samplesScraped := 0
|
samplesScraped := 0
|
||||||
samplesPostRelabeling := 0
|
samplesPostRelabeling := 0
|
||||||
|
@ -357,9 +368,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
|
"either reduce the sample count for the target or increase sample_limit", sw.Config.ScrapeURL, sw.Config.SampleLimit)
|
||||||
}
|
}
|
||||||
sw.updateSeriesAdded(wc)
|
sw.updateSeriesAdded(wc)
|
||||||
startTime := time.Now()
|
sw.pushData(&wc.writeRequest)
|
||||||
sw.PushData(&wc.writeRequest)
|
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
|
||||||
wc.resetNoRows()
|
wc.resetNoRows()
|
||||||
return nil
|
return nil
|
||||||
}, sw.logError)
|
}, sw.logError)
|
||||||
|
@ -385,9 +394,10 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
|
||||||
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_scraped", float64(samplesScraped), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_samples_post_metric_relabeling", float64(samplesPostRelabeling), scrapeTimestamp)
|
||||||
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
sw.addAutoTimeseries(wc, "scrape_series_added", float64(seriesAdded), scrapeTimestamp)
|
||||||
startTime := time.Now()
|
// Do not call sw.updateActiveSeries(wc), since wc doesn't contain the full list of scraped metrics.
|
||||||
sw.PushData(&wc.writeRequest)
|
// Do not track active series in streaming mode, since this may need too big amounts of memory
|
||||||
pushDataDuration.UpdateDuration(startTime)
|
// when the target exports too big number of metrics.
|
||||||
|
sw.pushData(&wc.writeRequest)
|
||||||
sw.prevLabelsLen = len(wc.labels)
|
sw.prevLabelsLen = len(wc.labels)
|
||||||
wc.reset()
|
wc.reset()
|
||||||
writeRequestCtxPool.Put(wc)
|
writeRequestCtxPool.Put(wc)
|
||||||
|
@ -475,6 +485,58 @@ func (sw *scrapeWork) updateSeriesAdded(wc *writeRequestCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sw *scrapeWork) updateActiveSeries(wc *writeRequestCtx) {
|
||||||
|
b := sw.activeSeriesBuf[:0]
|
||||||
|
as := sw.activeSeries[:0]
|
||||||
|
for _, ts := range wc.writeRequest.Timeseries {
|
||||||
|
bLen := len(b)
|
||||||
|
for _, label := range ts.Labels {
|
||||||
|
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Name))
|
||||||
|
b = encoding.MarshalBytes(b, bytesutil.ToUnsafeBytes(label.Value))
|
||||||
|
}
|
||||||
|
as = append(as, b[bLen:])
|
||||||
|
}
|
||||||
|
sw.activeSeriesBuf = b
|
||||||
|
sw.activeSeries = as
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *scrapeWork) sendStaleMarkers() {
|
||||||
|
series := make([]prompbmarshal.TimeSeries, 0, len(sw.activeSeries))
|
||||||
|
staleMarkSamples := []prompbmarshal.Sample{
|
||||||
|
{
|
||||||
|
Value: decimal.StaleNaN,
|
||||||
|
Timestamp: time.Now().UnixNano() / 1e6,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, b := range sw.activeSeries {
|
||||||
|
var labels []prompbmarshal.Label
|
||||||
|
for len(b) > 0 {
|
||||||
|
tail, name, err := encoding.UnmarshalBytes(b)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal label name from activeSeries: %s", err)
|
||||||
|
}
|
||||||
|
b = tail
|
||||||
|
tail, value, err := encoding.UnmarshalBytes(b)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal label value from activeSeries: %s", err)
|
||||||
|
}
|
||||||
|
b = tail
|
||||||
|
labels = append(labels, prompbmarshal.Label{
|
||||||
|
Name: bytesutil.ToUnsafeString(name),
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
series = append(series, prompbmarshal.TimeSeries{
|
||||||
|
Labels: labels,
|
||||||
|
Samples: staleMarkSamples,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wr := &prompbmarshal.WriteRequest{
|
||||||
|
Timeseries: series,
|
||||||
|
}
|
||||||
|
sw.pushData(wr)
|
||||||
|
}
|
||||||
|
|
||||||
func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {
|
func (sw *scrapeWork) finalizeSeriesAdded(lastScrapeSize int) int {
|
||||||
seriesAdded := sw.seriesAdded
|
seriesAdded := sw.seriesAdded
|
||||||
sw.seriesAdded = 0
|
sw.seriesAdded = 0
|
||||||
|
|
|
@ -18,6 +18,7 @@ import (
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||||
|
@ -1696,9 +1697,11 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
||||||
for i := range mrs {
|
for i := range mrs {
|
||||||
mr := &mrs[i]
|
mr := &mrs[i]
|
||||||
if math.IsNaN(mr.Value) {
|
if math.IsNaN(mr.Value) {
|
||||||
// Just skip NaNs, since the underlying encoding
|
if !decimal.IsStaleNaN(mr.Value) {
|
||||||
// doesn't know how to work with them.
|
// Skip NaNs other than Prometheus staleness marker, since the underlying encoding
|
||||||
continue
|
// doesn't know how to work with them.
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if mr.Timestamp < minTimestamp {
|
if mr.Timestamp < minTimestamp {
|
||||||
// Skip rows with too small timestamps outside the retention.
|
// Skip rows with too small timestamps outside the retention.
|
||||||
|
|
Loading…
Reference in a new issue