From 778ea183cab967e75da13fe4de1403f38ca60939 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Fri, 18 Sep 2020 19:07:05 +0300 Subject: [PATCH] lib/decimal: properly store Inf values Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/752 --- lib/decimal/decimal.go | 110 ++++++++++++++++++++++++++---------- lib/decimal/decimal_test.go | 46 +++++++-------- lib/storage/storage.go | 5 -- 3 files changed, 101 insertions(+), 60 deletions(-) diff --git a/lib/decimal/decimal.go b/lib/decimal/decimal.go index d474f7382c..21e7377ba9 100644 --- a/lib/decimal/decimal.go +++ b/lib/decimal/decimal.go @@ -76,11 +76,20 @@ func ExtendInt64sCapacity(dst []int64, additionalItems int) []int64 { return dst[:dstLen] } +func extendInt16sCapacity(dst []int16, additionalItems int) []int16 { + dstLen := len(dst) + if n := dstLen + additionalItems - cap(dst); n > 0 { + dst = append(dst[:cap(dst)], make([]int16, n)...) + } + return dst[:dstLen] +} + // AppendDecimalToFloat converts each item in va to f=v*10^e, appends it // to dst and returns the resulting dst. func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 { // Extend dst capacity in order to eliminate memory allocations below. dst = ExtendFloat64sCapacity(dst, len(va)) + a := dst[len(dst) : len(dst)+len(va)] if fastnum.IsInt64Zeros(va) { return fastnum.AppendFloat64Zeros(dst, len(va)) @@ -89,35 +98,53 @@ func AppendDecimalToFloat(dst []float64, va []int64, e int16) []float64 { if fastnum.IsInt64Ones(va) { return fastnum.AppendFloat64Ones(dst, len(va)) } - for _, v := range va { + _ = a[len(va)-1] + for i, v := range va { f := float64(v) - dst = append(dst, f) + if v == vInfPos { + f = infPos + } else if v == vInfNeg { + f = infNeg + } + a[i] = f } - return dst + return dst[:len(dst)+len(va)] } // increase conversion precision for negative exponents by dividing by e10 if e < 0 { e10 := math.Pow10(int(-e)) - for _, v := range va { + _ = a[len(va)-1] + for i, v := range va { f := float64(v) / e10 - dst = append(dst, f) + if v == vInfPos { + f = infPos + } else if v == vInfNeg { + f = infNeg + } + a[i] = f } - return dst + return dst[:len(dst)+len(va)] } e10 := math.Pow10(int(e)) - for _, v := range va { + _ = a[len(va)-1] + for i, v := range va { f := float64(v) * e10 - dst = append(dst, f) + if v == vInfPos { + f = infPos + } else if v == vInfNeg { + f = infNeg + } + a[i] = f } - return dst + return dst[:len(dst)+len(va)] } // AppendFloatToDecimal converts each item in src to v*10^e and appends // each v to dst returning it as va. // // It tries minimizing each item in dst. -func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { +func AppendFloatToDecimal(dst []int64, src []float64) ([]int64, int16) { if len(src) == 0 { return dst, 0 } @@ -130,9 +157,6 @@ func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { return dst, 0 } - // Extend dst capacity in order to eliminate memory allocations below. - dst = ExtendInt64sCapacity(dst, len(src)) - vaev := vaeBufPool.Get() if vaev == nil { vaev = &vaeBuf{ @@ -141,19 +165,20 @@ func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { } } vae := vaev.(*vaeBuf) - vae.va = vae.va[:0] - vae.ea = vae.ea[:0] + va := vae.va[:0] + ea := vae.ea[:0] + va = ExtendInt64sCapacity(va, len(src)) + va = va[:len(src)] + ea = extendInt16sCapacity(ea, len(src)) + ea = ea[:len(src)] // Determine the minimum exponent across all src items. - v, exp := FromFloat(src[0]) - vae.va = append(vae.va, v) - vae.ea = append(vae.ea, exp) - minExp := exp - for _, f := range src[1:] { + minExp := int16(1<<15 - 1) + for i, f := range src { v, exp := FromFloat(f) - vae.va = append(vae.va, v) - vae.ea = append(vae.ea, exp) - if exp < minExp { + va[i] = v + ea[i] = exp + if exp < minExp && v != vInfPos && v != vInfNeg { minExp = exp } } @@ -161,8 +186,12 @@ func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { // Determine whether all the src items may be upscaled to minExp. // If not, adjust minExp accordingly. downExp := int16(0) - for i, v := range vae.va { - exp := vae.ea[i] + _ = ea[len(va)-1] + for i, v := range va { + if v == vInfPos || v == vInfNeg { + continue + } + exp := ea[i] upExp := exp - minExp maxUpExp := maxUpExponent(v) if upExp-maxUpExp > downExp { @@ -171,9 +200,19 @@ func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { } minExp += downExp + // Extend dst capacity in order to eliminate memory allocations below. + dst = ExtendInt64sCapacity(dst, len(src)) + a := dst[len(dst) : len(dst)+len(src)] + // Scale each item in src to minExp and append it to dst. - for i, v := range vae.va { - exp := vae.ea[i] + _ = a[len(va)-1] + _ = ea[len(va)-1] + for i, v := range va { + if v == vInfPos || v == vInfNeg { + a[i] = v + continue + } + exp := ea[i] adjExp := exp - minExp for adjExp > 0 { v *= 10 @@ -183,12 +222,14 @@ func AppendFloatToDecimal(dst []int64, src []float64) (va []int64, e int16) { v /= 10 adjExp++ } - dst = append(dst, v) + a[i] = v } + vae.va = va + vae.ea = ea vaeBufPool.Put(vae) - return dst, minExp + return dst[:len(dst)+len(va)], minExp } type vaeBuf struct { @@ -290,6 +331,12 @@ func Round(f float64, digits int) float64 { // ToFloat returns f=v*10^e. func ToFloat(v int64, e int16) float64 { + if v == vInfPos { + return infPos + } + if v == vInfNeg { + return infNeg + } f := float64(v) // increase conversion precision for negative exponents by dividing by e10 if e < 0 { @@ -298,6 +345,11 @@ func ToFloat(v int64, e int16) float64 { return f * math.Pow10(int(e)) } +var ( + infPos = math.Inf(1) + infNeg = math.Inf(-1) +) + const ( vInfPos = 1<<63 - 1 vInfNeg = -1 << 63 diff --git a/lib/decimal/decimal_test.go b/lib/decimal/decimal_test.go index 5feabc871d..cc93720ed9 100644 --- a/lib/decimal/decimal_test.go +++ b/lib/decimal/decimal_test.go @@ -69,6 +69,8 @@ func TestPositiveFloatToDecimal(t *testing.T) { f(0.000874957, 874957, -9) f(0.001130435, 1130435, -9) + f(vInfPos, 9223372036854775, 3) + f(vMax, 9223372036854775, 3) } func TestAppendDecimalToFloat(t *testing.T) { @@ -87,7 +89,9 @@ func TestAppendDecimalToFloat(t *testing.T) { testAppendDecimalToFloat(t, []int64{874957, 1130435}, -11, []float64{8.74957e-6, 1.130435e-5}) testAppendDecimalToFloat(t, []int64{874957, 1130435}, -12, []float64{8.74957e-7, 1.130435e-6}) testAppendDecimalToFloat(t, []int64{874957, 1130435}, -13, []float64{8.74957e-8, 1.130435e-7}) - testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 0, []float64{9.223372036854776e+18, -9.223372036854776e+18, 1, 2}) + testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 0, []float64{infPos, infNeg, 1, 2}) + testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, 4, []float64{infPos, infNeg, 1e4, 2e4}) + testAppendDecimalToFloat(t, []int64{vInfPos, vInfNeg, 1, 2}, -4, []float64{infPos, infNeg, 1e-4, 2e-4}) } func testAppendDecimalToFloat(t *testing.T, va []int64, e int16, fExpected []float64) { @@ -245,8 +249,7 @@ func TestAppendFloatToDecimal(t *testing.T) { testAppendFloatToDecimal(t, []float64{}, nil, 0) testAppendFloatToDecimal(t, []float64{0}, []int64{0}, 0) testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123}, []int64{vInfPos, vInfNeg, 123}, 0) - testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123, 1e-4, 1e32}, []int64{92233, -92233, 0, 0, 1000000000000000000}, 14) - testAppendFloatToDecimal(t, []float64{float64(vInfPos), float64(vInfNeg), 123}, []int64{9223372036854775000, -9223372036854775000, 123}, 0) + testAppendFloatToDecimal(t, []float64{infPos, infNeg, 123, 1e-4, 1e32}, []int64{vInfPos, vInfNeg, 0, 0, 1000000000000000000}, 14) testAppendFloatToDecimal(t, []float64{0, -0, 1, -1, 12345678, -123456789}, []int64{0, 0, 1, -1, 12345678, -123456789}, 0) // upExp @@ -327,6 +330,10 @@ func TestFloatToDecimal(t *testing.T) { f(math.Inf(1), vInfPos, 0) f(math.Inf(-1), vInfNeg, 0) + f(vInfPos, 9223372036854775, 3) + f(vInfNeg, -9223372036854775, 3) + f(vMax, 9223372036854775, 3) + f(vMin, -9223372036854775, 3) f(1<<63-1, 9223372036854775, 3) f(-1<<63, -9223372036854775, 3) @@ -338,16 +345,15 @@ func TestFloatToDecimal(t *testing.T) { func TestFloatToDecimalRoundtrip(t *testing.T) { f := func(f float64) { t.Helper() - v, e := FromFloat(f) fNew := ToFloat(v, e) - if !equalFloat(fNew, f) { + if !equalFloat(f, fNew) { t.Fatalf("unexpected fNew for v=%d, e=%d; got %g; expecting %g", v, e, fNew, f) } v, e = FromFloat(-f) fNew = ToFloat(v, e) - if !equalFloat(fNew, -f) { + if !equalFloat(-f, fNew) { t.Fatalf("unexepcted fNew for v=%d, e=%d; got %g; expecting %g", v, e, fNew, -f) } } @@ -362,7 +368,7 @@ func TestFloatToDecimalRoundtrip(t *testing.T) { f(321e260) f(1234567890123) f(12.34567890125) - f(-1234567.8901256789) + f(1234567.8901256789) f(15e18) f(0.000874957) f(0.001130435) @@ -373,6 +379,8 @@ func TestFloatToDecimalRoundtrip(t *testing.T) { f(float64(vInfNeg)) f(infPos) f(infNeg) + f(vMax) + f(vMin) for i := 0; i < 1e4; i++ { v := rand.NormFloat64() @@ -396,26 +404,12 @@ func roundFloat(f float64, exp int) float64 { } func equalFloat(f1, f2 float64) bool { - f1 = adjustInf(f1) - f2 = adjustInf(f2) - if math.IsInf(f1, 0) { - return math.IsInf(f1, 1) == math.IsInf(f2, 1) || math.IsInf(f1, -1) == math.IsInf(f2, -1) + if math.IsInf(f1, 1) { + return math.IsInf(f2, 1) + } + if math.IsInf(f2, -1) { + return math.IsInf(f2, -1) } eps := math.Abs(f1 - f2) return eps == 0 || eps*conversionPrecision < math.Abs(f1)+math.Abs(f2) } - -func adjustInf(f float64) float64 { - if f == float64(vInfPos) { - return infPos - } - if f == float64(vInfNeg) { - return infNeg - } - return f -} - -var ( - infPos = math.Inf(1) - infNeg = math.Inf(-1) -) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index e744c6ab70..5f2d14d2c1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1203,11 +1203,6 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // doesn't know how to work with them. continue } - if math.IsInf(mr.Value, 0) { - // Skip Inf values, since they may break precision for already stored data. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/752 - continue - } if mr.Timestamp < minTimestamp { // Skip rows with too small timestamps outside the retention. if firstWarn == nil {