mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
5efe4eeadd
commit
efe5ec623c
3 changed files with 124 additions and 0 deletions
|
@ -850,6 +850,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`)
|
f(`* | stats Min(foo) bar`, `* | stats min(foo) as bar`)
|
||||||
f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`)
|
f(`* | stats BY(x, y, ) MIN(foo,bar,) bar`, `* | stats by (x, y) min(foo, bar) as bar`)
|
||||||
|
|
||||||
|
// stats pipe avg
|
||||||
|
f(`* | stats Avg(foo) bar`, `* | stats avg(foo) as bar`)
|
||||||
|
f(`* | stats BY(x, y, ) AVG(foo,bar,) bar`, `* | stats by (x, y) avg(foo, bar) as bar`)
|
||||||
|
|
||||||
// stats pipe uniq
|
// stats pipe uniq
|
||||||
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
|
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
|
||||||
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
|
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
|
||||||
|
@ -1117,6 +1121,11 @@ func TestParseQueryFailure(t *testing.T) {
|
||||||
f(`foo | stats min()`)
|
f(`foo | stats min()`)
|
||||||
f(`foo | stats min() as abc`)
|
f(`foo | stats min() as abc`)
|
||||||
|
|
||||||
|
// invalid stats avg
|
||||||
|
f(`foo | stats avg`)
|
||||||
|
f(`foo | stats avg()`)
|
||||||
|
f(`foo | stats avg() as abc`)
|
||||||
|
|
||||||
// invalid stats uniq
|
// invalid stats uniq
|
||||||
f(`foo | stats uniq`)
|
f(`foo | stats uniq`)
|
||||||
f(`foo | stats uniq()`)
|
f(`foo | stats uniq()`)
|
||||||
|
|
|
@ -446,6 +446,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
||||||
return nil, "", fmt.Errorf("cannot parse 'min' func: %w", err)
|
return nil, "", fmt.Errorf("cannot parse 'min' func: %w", err)
|
||||||
}
|
}
|
||||||
sf = sms
|
sf = sms
|
||||||
|
case lex.isKeyword("avg"):
|
||||||
|
sas, err := parseStatsAvg(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", fmt.Errorf("cannot parse 'avg' func: %w", err)
|
||||||
|
}
|
||||||
|
sf = sas
|
||||||
default:
|
default:
|
||||||
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
|
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
|
||||||
}
|
}
|
||||||
|
|
109
lib/logstorage/stats_avg.go
Normal file
109
lib/logstorage/stats_avg.go
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"slices"
|
||||||
|
"strconv"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
type statsAvg struct {
|
||||||
|
fields []string
|
||||||
|
containsStar bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sa *statsAvg) String() string {
|
||||||
|
return "avg(" + fieldNamesString(sa.fields) + ")"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sa *statsAvg) neededFields() []string {
|
||||||
|
return sa.fields
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) {
|
||||||
|
sap := &statsAvgProcessor{
|
||||||
|
sa: sa,
|
||||||
|
}
|
||||||
|
return sap, int(unsafe.Sizeof(*sap))
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsAvgProcessor struct {
|
||||||
|
sa *statsAvg
|
||||||
|
|
||||||
|
sum float64
|
||||||
|
count uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
|
if sap.sa.containsStar {
|
||||||
|
// Scan all the columns
|
||||||
|
for _, c := range br.getColumns() {
|
||||||
|
f, count := c.sumValues(br)
|
||||||
|
sap.sum += f
|
||||||
|
sap.count += uint64(count)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan the requested columns
|
||||||
|
for _, field := range sap.sa.fields {
|
||||||
|
c := br.getColumnByName(field)
|
||||||
|
f, count := c.sumValues(br)
|
||||||
|
sap.sum += f
|
||||||
|
sap.count += uint64(count)
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
||||||
|
if sap.sa.containsStar {
|
||||||
|
// Scan all the fields for the given row
|
||||||
|
for _, c := range br.getColumns() {
|
||||||
|
f := c.getFloatValueAtRow(rowIdx)
|
||||||
|
if !math.IsNaN(f) {
|
||||||
|
sap.sum += f
|
||||||
|
sap.count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan only the given fields for the given row
|
||||||
|
for _, field := range sap.sa.fields {
|
||||||
|
c := br.getColumnByName(field)
|
||||||
|
f := c.getFloatValueAtRow(rowIdx)
|
||||||
|
if !math.IsNaN(f) {
|
||||||
|
sap.sum += f
|
||||||
|
sap.count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sap *statsAvgProcessor) mergeState(sfp statsProcessor) {
|
||||||
|
src := sfp.(*statsAvgProcessor)
|
||||||
|
sap.sum += src.sum
|
||||||
|
sap.count += src.count
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sap *statsAvgProcessor) finalizeStats() string {
|
||||||
|
avg := sap.sum / float64(sap.count)
|
||||||
|
return strconv.FormatFloat(avg, 'f', -1, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStatsAvg(lex *lexer) (*statsAvg, error) {
|
||||||
|
lex.nextToken()
|
||||||
|
fields, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'avg' args: %w", err)
|
||||||
|
}
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return nil, fmt.Errorf("'avg' must contain at least one arg")
|
||||||
|
}
|
||||||
|
sa := &statsAvg{
|
||||||
|
fields: fields,
|
||||||
|
containsStar: slices.Contains(fields, "*"),
|
||||||
|
}
|
||||||
|
return sa, nil
|
||||||
|
}
|
Loading…
Reference in a new issue