This commit is contained in:
Aliaksandr Valialkin 2024-05-15 13:07:15 +02:00
parent 21f09ab823
commit d3e464a68b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
9 changed files with 126 additions and 313 deletions

View file

@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip ## tip
* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them.
* BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`.
## [v0.7.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.7.0-victorialogs) ## [v0.7.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.7.0-victorialogs)

View file

@ -1505,9 +1505,8 @@ See also:
### max stats ### max stats
`max(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the maximum value across `max(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the maximum value across
all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
Non-numeric values are ignored.
For example, the following query returns the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) For example, the following query returns the maximum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes: over logs for the last 5 minutes:
@ -1543,9 +1542,8 @@ See also:
### min stats ### min stats
`min(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the minimum value across `min(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the minimum value across
all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). all the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
Non-numeric values are ignored.
For example, the following query returns the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) For example, the following query returns the minimum value for the `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes: over logs for the last 5 minutes:

View file

@ -1470,271 +1470,46 @@ func (c *blockResultColumn) getValues(br *blockResult) []string {
return c.values return c.values
} }
func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) (float64, bool) {
if c.isConst { if c.isConst {
v := c.encodedValues[0] v := c.encodedValues[0]
f, ok := tryParseFloat64(v) return tryParseFloat64(v)
if !ok {
return nan
}
return f
} }
if c.isTime { if c.isTime {
return nan return 0, false
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
f, ok := tryParseFloat64(c.encodedValues[rowIdx]) v := c.encodedValues[rowIdx]
if !ok { return tryParseFloat64(v)
return nan
}
return f
case valueTypeDict: case valueTypeDict:
dictIdx := c.encodedValues[rowIdx][0] dictIdx := c.encodedValues[rowIdx][0]
f, ok := tryParseFloat64(c.dictValues[dictIdx]) v := c.dictValues[dictIdx]
if !ok { return tryParseFloat64(v)
return nan
}
return f
case valueTypeUint8: case valueTypeUint8:
return float64(c.encodedValues[rowIdx][0]) return float64(c.encodedValues[rowIdx][0]), true
case valueTypeUint16: case valueTypeUint16:
b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx])
return float64(encoding.UnmarshalUint16(b)) return float64(encoding.UnmarshalUint16(b)), true
case valueTypeUint32: case valueTypeUint32:
b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx])
return float64(encoding.UnmarshalUint32(b)) return float64(encoding.UnmarshalUint32(b)), true
case valueTypeUint64: case valueTypeUint64:
b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx])
return float64(encoding.UnmarshalUint64(b)) return float64(encoding.UnmarshalUint64(b)), true
case valueTypeFloat64: case valueTypeFloat64:
b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx]) b := bytesutil.ToUnsafeBytes(c.encodedValues[rowIdx])
n := encoding.UnmarshalUint64(b) n := encoding.UnmarshalUint64(b)
return math.Float64frombits(n)
case valueTypeIPv4:
return nan
case valueTypeTimestampISO8601:
return nan
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return nan
}
}
func (c *blockResultColumn) getMaxValue() float64 {
if c.isConst {
v := c.encodedValues[0]
f, ok := tryParseFloat64(v)
if !ok {
return nan
}
return f
}
if c.isTime {
return nan
}
switch c.valueType {
case valueTypeString:
max := nan
f := float64(0)
ok := false
values := c.encodedValues
for i := range values {
if i == 0 || values[i-1] != values[i] {
f, ok = tryParseFloat64(values[i])
}
if ok && (f > max || math.IsNaN(max)) {
max = f
}
}
return max
case valueTypeDict:
a := encoding.GetFloat64s(len(c.dictValues))
dictValuesFloat := a.A
for i, v := range c.dictValues {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
dictValuesFloat[i] = f
}
max := nan
for _, v := range c.encodedValues {
dictIdx := v[0]
f := dictValuesFloat[dictIdx]
if f > max || math.IsNaN(max) {
max = f
}
}
encoding.PutFloat64s(a)
return max
case valueTypeUint8:
max := -inf
for _, v := range c.encodedValues {
f := float64(v[0])
if f > max {
max = f
}
}
return max
case valueTypeUint16:
max := -inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint16(b))
if f > max {
max = f
}
}
return max
case valueTypeUint32:
max := -inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint32(b))
if f > max {
max = f
}
}
return max
case valueTypeUint64:
max := -inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint64(b))
if f > max {
max = f
}
}
return max
case valueTypeFloat64:
max := nan
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n) f := math.Float64frombits(n)
if math.IsNaN(max) || f > max { return f, !math.IsNaN(f)
max = f
}
}
return max
case valueTypeIPv4: case valueTypeIPv4:
return nan return 0, false
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
return nan return 0, false
default: default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType) logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return nan return 0, false
}
}
func (c *blockResultColumn) getMinValue() float64 {
if c.isConst {
v := c.encodedValues[0]
f, ok := tryParseFloat64(v)
if !ok {
return nan
}
return f
}
if c.isTime {
return nan
}
switch c.valueType {
case valueTypeString:
min := nan
f := float64(0)
ok := false
values := c.encodedValues
for i := range values {
if i == 0 || values[i-1] != values[i] {
f, ok = tryParseFloat64(values[i])
}
if ok && (f < min || math.IsNaN(min)) {
min = f
}
}
return min
case valueTypeDict:
a := encoding.GetFloat64s(len(c.dictValues))
dictValuesFloat := a.A
for i, v := range c.dictValues {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
dictValuesFloat[i] = f
}
min := nan
for _, v := range c.encodedValues {
dictIdx := v[0]
f := dictValuesFloat[dictIdx]
if f < min || math.IsNaN(min) {
min = f
}
}
encoding.PutFloat64s(a)
return min
case valueTypeUint8:
min := inf
for _, v := range c.encodedValues {
f := float64(v[0])
if f < min {
min = f
}
}
return min
case valueTypeUint16:
min := inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint16(b))
if f < min {
min = f
}
}
return min
case valueTypeUint32:
min := inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint32(b))
if f < min {
min = f
}
}
return min
case valueTypeUint64:
min := inf
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
f := float64(encoding.UnmarshalUint64(b))
if f < min {
min = f
}
}
return min
case valueTypeFloat64:
min := nan
for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v)
n := encoding.UnmarshalUint64(b)
f := math.Float64frombits(n)
if math.IsNaN(min) || f < min {
min = f
}
}
return min
case valueTypeIPv4:
return nan
case valueTypeTimestampISO8601:
return nan
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return nan
} }
} }

View file

@ -545,9 +545,29 @@ func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool {
} }
if isDesc { if isDesc {
return stringsutil.LessNatural(vB, vA) return lessString(vB, vA)
} }
return stringsutil.LessNatural(vA, vB) return lessString(vA, vB)
} }
return false return false
} }
func lessString(a, b string) bool {
if a == b {
return false
}
nA, okA := tryParseUint64(a)
nB, okB := tryParseUint64(b)
if okA && okB {
return nA < nB
}
fA, okA := tryParseFloat64(a)
fB, okB := tryParseFloat64(b)
if okA && okB {
return fA < fB
}
return stringsutil.LessNatural(a, b)
}

View file

@ -1,7 +1,6 @@
package logstorage package logstorage
import ( import (
"math"
"slices" "slices"
"strconv" "strconv"
"unsafe" "unsafe"
@ -58,8 +57,8 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
if sap.sa.containsStar { if sap.sa.containsStar {
// Scan all the fields for the given row // Scan all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
sap.sum += f sap.sum += f
sap.count++ sap.count++
} }
@ -68,8 +67,8 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
// Scan only the given fields for the given row // Scan only the given fields for the given row
for _, field := range sap.sa.fields { for _, field := range sap.sa.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
sap.sum += f sap.sum += f
sap.count++ sap.count++
} }

View file

@ -1,9 +1,8 @@
package logstorage package logstorage
import ( import (
"math"
"slices" "slices"
"strconv" "strings"
"unsafe" "unsafe"
) )
@ -23,7 +22,6 @@ func (sm *statsMax) neededFields() []string {
func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
smp := &statsMaxProcessor{ smp := &statsMaxProcessor{
sm: sm, sm: sm,
max: nan,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
@ -31,62 +29,74 @@ func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
type statsMaxProcessor struct { type statsMaxProcessor struct {
sm *statsMax sm *statsMax
max float64 max string
hasMax bool
} }
func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
maxLen := len(smp.max)
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the maximum value across all the columns // Find the minimum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMaxValue() for _, v := range c.getValues(br) {
if f > smp.max || math.IsNaN(smp.max) { smp.updateState(v)
smp.max = f
} }
} }
} else { } else {
// Find the maximum value across the requested columns // Find the minimum value across the requested columns
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMaxValue() for _, v := range c.getValues(br) {
if f > smp.max || math.IsNaN(smp.max) { smp.updateState(v)
smp.max = f
} }
} }
} }
return 0
return len(smp.max) - maxLen
} }
func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
maxLen := len(smp.max)
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the maximum value across all the fields for the given row // Find the minimum value across all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) v := c.getValueAtRow(br, rowIdx)
if f > smp.max || math.IsNaN(smp.max) { smp.updateState(v)
smp.max = f
}
} }
} else { } else {
// Find the maximum value across the requested fields for the given row // Find the minimum value across the requested fields for the given row
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) v := c.getValueAtRow(br, rowIdx)
if f > smp.max || math.IsNaN(smp.max) { smp.updateState(v)
smp.max = f
} }
} }
}
return 0 return maxLen - len(smp.max)
} }
func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsMaxProcessor) src := sfp.(*statsMaxProcessor)
if src.max > smp.max { if src.hasMax {
smp.max = src.max smp.updateState(src.max)
} }
} }
func (smp *statsMaxProcessor) updateState(v string) {
if smp.hasMax && !lessString(smp.max, v) {
return
}
smp.max = strings.Clone(v)
smp.hasMax = true
}
func (smp *statsMaxProcessor) finalizeStats() string { func (smp *statsMaxProcessor) finalizeStats() string {
return strconv.FormatFloat(smp.max, 'f', -1, 64) if !smp.hasMax {
return "NaN"
}
return smp.max
} }
func parseStatsMax(lex *lexer) (*statsMax, error) { func parseStatsMax(lex *lexer) (*statsMax, error) {

View file

@ -1,9 +1,8 @@
package logstorage package logstorage
import ( import (
"math"
"slices" "slices"
"strconv" "strings"
"unsafe" "unsafe"
) )
@ -23,7 +22,6 @@ func (sm *statsMin) neededFields() []string {
func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsMinProcessor{ smp := &statsMinProcessor{
sm: sm, sm: sm,
min: nan,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
@ -31,62 +29,74 @@ func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
type statsMinProcessor struct { type statsMinProcessor struct {
sm *statsMin sm *statsMin
min float64 min string
hasMin bool
} }
func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
minLen := len(smp.min)
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the minimum value across all the columns // Find the minimum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMinValue() for _, v := range c.getValues(br) {
if f < smp.min || math.IsNaN(smp.min) { smp.updateState(v)
smp.min = f
} }
} }
} else { } else {
// Find the minimum value across the requested columns // Find the minimum value across the requested columns
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMinValue() for _, v := range c.getValues(br) {
if f < smp.min || math.IsNaN(smp.min) { smp.updateState(v)
smp.min = f
} }
} }
} }
return 0
return len(smp.min) - minLen
} }
func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
minLen := len(smp.min)
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the minimum value across all the fields for the given row // Find the minimum value across all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) v := c.getValueAtRow(br, rowIdx)
if f < smp.min || math.IsNaN(smp.min) { smp.updateState(v)
smp.min = f
}
} }
} else { } else {
// Find the minimum value across the requested fields for the given row // Find the minimum value across the requested fields for the given row
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) v := c.getValueAtRow(br, rowIdx)
if f < smp.min || math.IsNaN(smp.min) { smp.updateState(v)
smp.min = f
} }
} }
}
return 0 return minLen - len(smp.min)
} }
func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { func (smp *statsMinProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsMinProcessor) src := sfp.(*statsMinProcessor)
if src.min < smp.min { if src.hasMin {
smp.min = src.min smp.updateState(src.min)
} }
} }
func (smp *statsMinProcessor) updateState(v string) {
if smp.hasMin && !lessString(v, smp.min) {
return
}
smp.min = strings.Clone(v)
smp.hasMin = true
}
func (smp *statsMinProcessor) finalizeStats() string { func (smp *statsMinProcessor) finalizeStats() string {
return strconv.FormatFloat(smp.min, 'f', -1, 64) if !smp.hasMin {
return "NaN"
}
return smp.min
} }
func parseStatsMin(lex *lexer) (*statsMin, error) { func parseStatsMin(lex *lexer) (*statsMin, error) {

View file

@ -2,7 +2,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"slices" "slices"
"strconv" "strconv"
"unsafe" "unsafe"
@ -72,16 +71,16 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
if sqp.sq.containsStar { if sqp.sq.containsStar {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
stateSizeIncrease += h.update(f) stateSizeIncrease += h.update(f)
} }
} }
} else { } else {
for _, field := range sqp.sq.fields { for _, field := range sqp.sq.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
stateSizeIncrease += h.update(f) stateSizeIncrease += h.update(f)
} }
} }

View file

@ -68,8 +68,8 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
if ssp.ss.containsStar { if ssp.ss.containsStar {
// Sum all the fields for the given row // Sum all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
if math.IsNaN(ssp.sum) { if math.IsNaN(ssp.sum) {
ssp.sum = f ssp.sum = f
} else { } else {
@ -81,8 +81,8 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
// Sum only the given fields for the given row // Sum only the given fields for the given row
for _, field := range ssp.ss.fields { for _, field := range ssp.ss.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if ok {
if math.IsNaN(ssp.sum) { if math.IsNaN(ssp.sum) {
ssp.sum = f ssp.sum = f
} else { } else {