This commit is contained in:
Aliaksandr Valialkin 2024-06-05 01:18:26 +02:00
parent 346375fb0c
commit c96f4c565d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 114 additions and 57 deletions

View file

@ -3,6 +3,7 @@ package logstorage
import ( import (
"math" "math"
"slices" "slices"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -1916,5 +1917,34 @@ func getCanonicalColumnName(columnName string) string {
return columnName return columnName
} }
func tryParseNumber(s string) (float64, bool) {
if len(s) == 0 {
return 0, false
}
f, ok := tryParseFloat64(s)
if ok {
return f, true
}
nsecs, ok := tryParseDuration(s)
if ok {
return float64(nsecs), true
}
bytes, ok := tryParseBytes(s)
if ok {
return float64(bytes), true
}
if isNumberPrefix(s) {
f, err := strconv.ParseFloat(s, 64)
if err == nil {
return f, true
}
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return float64(n), true
}
}
return 0, false
}
var nan = math.NaN() var nan = math.NaN()
var inf = math.Inf(1) var inf = math.Inf(1)

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"math" "math"
"strconv"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
@ -130,8 +129,30 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) {
f := unmarshalFloat64(v) f := unmarshalFloat64(v)
return f >= minValue && f <= maxValue return f >= minValue && f <= maxValue
}) })
case valueTypeTimestampISO8601: case valueTypeIPv4:
minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue)
if maxValue < 0 || uint64(minValueUint32) > c.maxValue || uint64(maxValueUint32) < c.minValue {
bm.resetBits() bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := unmarshalIPv4(v)
return n >= minValueUint32 && n <= maxValueUint32
})
case valueTypeTimestampISO8601:
minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
if maxValue < 0 || minValueInt > int64(c.maxValue) || maxValueInt < int64(c.minValue) {
bm.resetBits()
return
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
n := unmarshalTimestampISO8601(v)
return n >= minValueInt && n <= maxValueInt
})
default: default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType) logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
} }
@ -179,9 +200,10 @@ func (fr *filterRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
case valueTypeFloat64: case valueTypeFloat64:
matchFloat64ByRange(bs, ch, bm, minValue, maxValue) matchFloat64ByRange(bs, ch, bm, minValue, maxValue)
case valueTypeIPv4: case valueTypeIPv4:
bm.resetBits() minValueUint32, maxValueUint32 := toUint32Range(minValue, maxValue)
matchIPv4ByRange(bs, ch, bm, minValueUint32, maxValueUint32)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
bm.resetBits() matchTimestampISO8601ByRange(bs, ch, bm, minValue, maxValue)
default: default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType) logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
} }
@ -264,7 +286,7 @@ func matchUint32ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bb := bbPool.Get() bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool { visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 4 { if len(v) != 4 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 4", bs.partPath(), len(v)) logger.Panicf("FATAL: %s: unexpected length for binary representation of uint32 number: got %d; want 4", bs.partPath(), len(v))
} }
n := uint64(unmarshalUint32(v)) n := uint64(unmarshalUint32(v))
return n >= minValueUint && n <= maxValueUint return n >= minValueUint && n <= maxValueUint
@ -281,7 +303,7 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bb := bbPool.Get() bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool { visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 8 { if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of uint8 number: got %d; want 8", bs.partPath(), len(v)) logger.Panicf("FATAL: %s: unexpected length for binary representation of uint64 number: got %d; want 8", bs.partPath(), len(v))
} }
n := unmarshalUint64(v) n := unmarshalUint64(v)
return n >= minValueUint && n <= maxValueUint return n >= minValueUint && n <= maxValueUint
@ -289,41 +311,26 @@ func matchUint64ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue,
bbPool.Put(bb) bbPool.Put(bb)
} }
func matchRange(s string, minValue, maxValue float64) bool { func matchTimestampISO8601ByRange(bs *blockSearch, ch *columnHeader, bm *bitmap, minValue, maxValue float64) {
f, ok := tryParseNumber(s) minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
if !ok { if maxValue < 0 || minValueInt > int64(ch.maxValue) || maxValueInt < int64(ch.minValue) {
return false bm.resetBits()
return
} }
return f >= minValue && f <= maxValue bb := bbPool.Get()
visitValues(bs, ch, bm, func(v string) bool {
if len(v) != 8 {
logger.Panicf("FATAL: %s: unexpected length for binary representation of timestampISO8601: got %d; want 8", bs.partPath(), len(v))
}
n := unmarshalTimestampISO8601(v)
return n >= minValueInt && n <= maxValueInt
})
bbPool.Put(bb)
} }
func tryParseNumber(s string) (float64, bool) { func matchRange(s string, minValue, maxValue float64) bool {
if len(s) == 0 { f := parseMathNumber(s)
return 0, false return f >= minValue && f <= maxValue
}
f, ok := tryParseFloat64(s)
if ok {
return f, true
}
nsecs, ok := tryParseDuration(s)
if ok {
return float64(nsecs), true
}
bytes, ok := tryParseBytes(s)
if ok {
return float64(bytes), true
}
if isNumberPrefix(s) {
f, err := strconv.ParseFloat(s, 64)
if err == nil {
return f, true
}
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return float64(n), true
}
}
return 0, false
} }
func toUint64Range(minValue, maxValue float64) (uint64, uint64) { func toUint64Range(minValue, maxValue float64) (uint64, uint64) {
@ -341,3 +348,35 @@ func toUint64Clamp(f float64) uint64 {
} }
return uint64(f) return uint64(f)
} }
func toInt64Range(minValue, maxValue float64) (int64, int64) {
minValue = math.Ceil(minValue)
maxValue = math.Floor(maxValue)
return toInt64Clamp(minValue), toInt64Clamp(maxValue)
}
func toInt64Clamp(f float64) int64 {
if f < math.MinInt64 {
return math.MinInt64
}
if f > math.MaxInt64 {
return math.MaxInt64
}
return int64(f)
}
func toUint32Range(minValue, maxValue float64) (uint32, uint32) {
minValue = math.Ceil(minValue)
maxValue = math.Floor(maxValue)
return toUint32Clamp(minValue), toUint32Clamp(maxValue)
}
func toUint32Clamp(f float64) uint32 {
if f < 0 {
return 0
}
if f > math.MaxUint32 {
return math.MaxUint32
}
return uint32(f)
}

View file

@ -1309,28 +1309,13 @@ func parseFloat64(lex *lexer) (float64, string, error) {
if err != nil { if err != nil {
return 0, "", fmt.Errorf("cannot parse float64 from %q: %w", s, err) return 0, "", fmt.Errorf("cannot parse float64 from %q: %w", s, err)
} }
f, err := strconv.ParseFloat(s, 64)
if err == nil { f := parseMathNumber(s)
if !math.IsNaN(f) || strings.EqualFold(s, "nan") {
return f, s, nil return f, s, nil
} }
// Try parsing s as integer. return 0, "", fmt.Errorf("cannot parse %q as float64", s)
// This handles 0x..., 0b... and 0... prefixes, alongside '_' delimiters.
n, err := strconv.ParseInt(s, 0, 64)
if err == nil {
return float64(n), s, nil
}
nn, ok := tryParseBytes(s)
if ok {
return float64(nn), s, nil
}
nn, ok = tryParseDuration(s)
if ok {
return float64(nn), s, nil
}
return 0, "", fmt.Errorf("cannot parse %q as float64: %w", s, err)
} }
func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) { func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) {
@ -1626,6 +1611,9 @@ func isNumberPrefix(s string) bool {
return false return false
} }
} }
if len(s) >= 3 && strings.EqualFold(s, "inf") {
return true
}
if s[0] == '.' { if s[0] == '.' {
s = s[1:] s = s[1:]
if len(s) == 0 { if len(s) == 0 {