This commit is contained in:
Aliaksandr Valialkin 2024-05-01 10:31:46 +02:00
parent 619dff6861
commit 5efe4eeadd
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 74 additions and 54 deletions

View file

@ -21,9 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. * FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default.
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields). * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields).
* FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details. * FEATURE: add support for calculating `count()`, `uniq()`, `sum()`, `min()` and `max()` over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details.
* FEATURE: add support for counting the number of unique values for [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details.
* FEATURE: add support for summing [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details.
* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
* FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed. * FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed.
* FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter). * FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter).

View file

@ -750,7 +750,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 {
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
max := math.Inf(-1) max := nan
f := float64(0) f := float64(0)
ok := false ok := false
values := c.encodedValues values := c.encodedValues
@ -758,13 +758,10 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 {
if i == 0 || values[i-1] != values[i] { if i == 0 || values[i-1] != values[i] {
f, ok = tryParseFloat64(values[i]) f, ok = tryParseFloat64(values[i])
} }
if ok && f > max { if ok && (f > max || math.IsNaN(max)) {
max = f max = f
} }
} }
if math.IsInf(max, -1) {
return nan
}
return max return max
case valueTypeDict: case valueTypeDict:
a := encoding.GetFloat64s(len(c.dictValues)) a := encoding.GetFloat64s(len(c.dictValues))
@ -776,18 +773,15 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 {
} }
dictValuesFloat[i] = f dictValuesFloat[i] = f
} }
max := math.Inf(-1) max := nan
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
dictIdx := v[0] dictIdx := v[0]
f := dictValuesFloat[dictIdx] f := dictValuesFloat[dictIdx]
if f > max { if f > max || math.IsNaN(max) {
max = f max = f
} }
} }
encoding.PutFloat64s(a) encoding.PutFloat64s(a)
if math.IsInf(max, -1) {
return nan
}
return max return max
case valueTypeUint8: case valueTypeUint8:
max := math.Inf(-1) max := math.Inf(-1)
@ -853,7 +847,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 {
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
min := math.Inf(1) min := nan
f := float64(0) f := float64(0)
ok := false ok := false
values := c.encodedValues values := c.encodedValues
@ -861,13 +855,10 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 {
if i == 0 || values[i-1] != values[i] { if i == 0 || values[i-1] != values[i] {
f, ok = tryParseFloat64(values[i]) f, ok = tryParseFloat64(values[i])
} }
if ok && f < min { if ok && (f < min || math.IsNaN(min)) {
min = f min = f
} }
} }
if math.IsInf(min, 1) {
return nan
}
return min return min
case valueTypeDict: case valueTypeDict:
a := encoding.GetFloat64s(len(c.dictValues)) a := encoding.GetFloat64s(len(c.dictValues))
@ -879,18 +870,15 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 {
} }
dictValuesFloat[i] = f dictValuesFloat[i] = f
} }
min := math.Inf(1) min := nan
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
dictIdx := v[0] dictIdx := v[0]
f := dictValuesFloat[dictIdx] f := dictValuesFloat[dictIdx]
if f < min { if f < min || math.IsNaN(min) {
min = f min = f
} }
} }
encoding.PutFloat64s(a) encoding.PutFloat64s(a)
if math.IsInf(min, 1) {
return nan
}
return min return min
case valueTypeUint8: case valueTypeUint8:
min := math.Inf(1) min := math.Inf(1)
@ -941,22 +929,23 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 {
} }
} }
func (c *blockResultColumn) sumValues(br *blockResult) float64 { func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
if c.isConst { if c.isConst {
v := c.encodedValues[0] v := c.encodedValues[0]
f, ok := tryParseFloat64(v) f, ok := tryParseFloat64(v)
if !ok { if !ok {
return 0 return 0, 0
} }
return f * float64(len(br.timestamps)) return f * float64(len(br.timestamps)), len(br.timestamps)
} }
if c.isTime { if c.isTime {
return 0 return 0, 0
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
sum := float64(0) sum := float64(0)
count := 0
f := float64(0) f := float64(0)
ok := false ok := false
values := c.encodedValues values := c.encodedValues
@ -966,53 +955,59 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 {
} }
if ok { if ok {
sum += f sum += f
count++
} }
} }
return sum return sum, count
case valueTypeDict: case valueTypeDict:
a := encoding.GetFloat64s(len(c.dictValues)) a := encoding.GetFloat64s(len(c.dictValues))
dictValuesFloat := a.A dictValuesFloat := a.A
for i, v := range c.dictValues { for i, v := range c.dictValues {
f, ok := tryParseFloat64(v) f, ok := tryParseFloat64(v)
if !ok { if !ok {
f = 0 f = nan
} }
dictValuesFloat[i] = f dictValuesFloat[i] = f
} }
sum := float64(0) sum := float64(0)
count := 0
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
dictIdx := v[0] dictIdx := v[0]
sum += dictValuesFloat[dictIdx] f := dictValuesFloat[dictIdx]
if !math.IsNaN(f) {
sum += f
count++
}
} }
encoding.PutFloat64s(a) encoding.PutFloat64s(a)
return sum return sum, count
case valueTypeUint8: case valueTypeUint8:
sum := uint64(0) sum := uint64(0)
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
sum += uint64(v[0]) sum += uint64(v[0])
} }
return float64(sum) return float64(sum), len(br.timestamps)
case valueTypeUint16: case valueTypeUint16:
sum := uint64(0) sum := uint64(0)
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v) b := bytesutil.ToUnsafeBytes(v)
sum += uint64(encoding.UnmarshalUint16(b)) sum += uint64(encoding.UnmarshalUint16(b))
} }
return float64(sum) return float64(sum), len(br.timestamps)
case valueTypeUint32: case valueTypeUint32:
sum := uint64(0) sum := uint64(0)
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v) b := bytesutil.ToUnsafeBytes(v)
sum += uint64(encoding.UnmarshalUint32(b)) sum += uint64(encoding.UnmarshalUint32(b))
} }
return float64(sum) return float64(sum), len(br.timestamps)
case valueTypeUint64: case valueTypeUint64:
sum := float64(0) sum := float64(0)
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
b := bytesutil.ToUnsafeBytes(v) b := bytesutil.ToUnsafeBytes(v)
sum += float64(encoding.UnmarshalUint64(b)) sum += float64(encoding.UnmarshalUint64(b))
} }
return sum return sum, len(br.timestamps)
case valueTypeFloat64: case valueTypeFloat64:
sum := float64(0) sum := float64(0)
for _, v := range c.encodedValues { for _, v := range c.encodedValues {
@ -1023,14 +1018,14 @@ func (c *blockResultColumn) sumValues(br *blockResult) float64 {
sum += f sum += f
} }
} }
return sum return sum, len(br.timestamps)
case valueTypeIPv4: case valueTypeIPv4:
return 0 return 0, 0
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
return 0 return 0, 0
default: default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType) logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0 return 0, 0
} }
} }

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"slices" "slices"
"strconv" "strconv"
"unsafe" "unsafe"
@ -23,6 +24,7 @@ func (sm *statsMax) neededFields() []string {
func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
smp := &statsMaxProcessor{ smp := &statsMaxProcessor{
sm: sm, sm: sm,
max: nan,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
@ -38,7 +40,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
// Find the maximum value across all the columns // Find the maximum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMaxValue(br) f := c.getMaxValue(br)
if f > smp.max { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }
} }
@ -49,7 +51,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMaxValue(br) f := c.getMaxValue(br)
if f > smp.max { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }
} }
@ -61,7 +63,7 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
// Find the maximum value across all the fields for the given row // Find the maximum value across all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if f > smp.max { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }
} }
@ -72,7 +74,7 @@ func (smp *statsMaxProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if f > smp.max { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }
} }

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"slices" "slices"
"strconv" "strconv"
"unsafe" "unsafe"
@ -23,6 +24,7 @@ func (sm *statsMin) neededFields() []string {
func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
smp := &statsMinProcessor{ smp := &statsMinProcessor{
sm: sm, sm: sm,
min: nan,
} }
return smp, int(unsafe.Sizeof(*smp)) return smp, int(unsafe.Sizeof(*smp))
} }
@ -38,7 +40,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
// Find the minimum value across all the columns // Find the minimum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMinValue(br) f := c.getMinValue(br)
if f < smp.min { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }
} }
@ -49,7 +51,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMinValue(br) f := c.getMinValue(br)
if f < smp.min { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }
} }
@ -61,7 +63,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
// Find the minimum value across all the fields for the given row // Find the minimum value across all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if f < smp.min { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }
} }
@ -72,7 +74,7 @@ func (smp *statsMinProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if f < smp.min { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }
} }

View file

@ -24,6 +24,7 @@ func (ss *statsSum) neededFields() []string {
func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { func (ss *statsSum) newStatsProcessor() (statsProcessor, int) {
ssp := &statsSumProcessor{ ssp := &statsSumProcessor{
ss: ss, ss: ss,
sum: nan,
} }
return ssp, int(unsafe.Sizeof(*ssp)) return ssp, int(unsafe.Sizeof(*ssp))
} }
@ -38,7 +39,14 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int {
if ssp.ss.containsStar { if ssp.ss.containsStar {
// Sum all the columns // Sum all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
ssp.sum += c.sumValues(br) f, count := c.sumValues(br)
if count > 0 {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f
}
}
} }
return 0 return 0
} }
@ -46,7 +54,14 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int {
// Sum the requested columns // Sum the requested columns
for _, field := range ssp.ss.fields { for _, field := range ssp.ss.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
ssp.sum += c.sumValues(br) f, count := c.sumValues(br)
if count > 0 {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f
}
}
} }
return 0 return 0
} }
@ -57,9 +72,13 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if !math.IsNaN(f) {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f ssp.sum += f
} }
} }
}
return 0 return 0
} }
@ -68,9 +87,13 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx) f := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) { if !math.IsNaN(f) {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f ssp.sum += f
} }
} }
}
return 0 return 0
} }