VictoriaMetrics/lib/logstorage/stats_avg.go

138 lines
2.8 KiB
Go
Raw Normal View History

2024-05-01 08:40:04 +00:00
package logstorage
import (
2024-05-22 09:25:49 +00:00
"fmt"
2024-05-01 08:40:04 +00:00
"slices"
"strconv"
2024-05-22 09:25:49 +00:00
"strings"
2024-05-01 08:40:04 +00:00
"unsafe"
)
type statsAvg struct {
2024-05-22 09:25:49 +00:00
fields []string
2024-05-01 08:40:04 +00:00
}
func (sa *statsAvg) String() string {
2024-05-22 09:25:49 +00:00
return "avg(" + statsFuncFieldsToString(sa.fields) + ")"
2024-05-01 08:40:04 +00:00
}
2024-05-17 02:11:10 +00:00
func (sa *statsAvg) updateNeededFields(neededFields fieldsSet) {
2024-05-22 09:25:49 +00:00
updateNeededFieldsForStatsFunc(neededFields, sa.fields)
2024-05-01 08:40:04 +00:00
}
func (sa *statsAvg) newStatsProcessor() (statsProcessor, int) {
sap := &statsAvgProcessor{
sa: sa,
}
return sap, int(unsafe.Sizeof(*sap))
}
type statsAvgProcessor struct {
sa *statsAvg
sum float64
count uint64
}
func (sap *statsAvgProcessor) updateStatsForAllRows(br *blockResult) int {
2024-05-22 09:25:49 +00:00
fields := sap.sa.fields
if len(fields) == 0 {
2024-05-01 08:40:04 +00:00
// Scan all the columns
for _, c := range br.getColumns() {
f, count := c.sumValues(br)
sap.sum += f
sap.count += uint64(count)
}
2024-05-03 12:03:17 +00:00
} else {
// Scan the requested columns
2024-05-22 09:25:49 +00:00
for _, field := range fields {
2024-05-03 12:03:17 +00:00
c := br.getColumnByName(field)
f, count := c.sumValues(br)
sap.sum += f
sap.count += uint64(count)
}
2024-05-01 08:40:04 +00:00
}
return 0
}
func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
2024-05-22 09:25:49 +00:00
fields := sap.sa.fields
if len(fields) == 0 {
2024-05-01 08:40:04 +00:00
// Scan all the fields for the given row
for _, c := range br.getColumns() {
2024-05-15 20:19:21 +00:00
f, ok := c.getFloatValueAtRow(br, rowIdx)
2024-05-15 11:07:15 +00:00
if ok {
2024-05-01 08:40:04 +00:00
sap.sum += f
sap.count++
}
}
2024-05-03 12:03:17 +00:00
} else {
// Scan only the given fields for the given row
2024-05-22 09:25:49 +00:00
for _, field := range fields {
2024-05-03 12:03:17 +00:00
c := br.getColumnByName(field)
2024-05-15 20:19:21 +00:00
f, ok := c.getFloatValueAtRow(br, rowIdx)
2024-05-15 11:07:15 +00:00
if ok {
2024-05-03 12:03:17 +00:00
sap.sum += f
sap.count++
}
2024-05-01 08:40:04 +00:00
}
}
return 0
}
func (sap *statsAvgProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsAvgProcessor)
sap.sum += src.sum
sap.count += src.count
}
func (sap *statsAvgProcessor) finalizeStats() string {
avg := sap.sum / float64(sap.count)
return strconv.FormatFloat(avg, 'f', -1, 64)
}
func parseStatsAvg(lex *lexer) (*statsAvg, error) {
2024-05-22 09:25:49 +00:00
fields, err := parseStatsFuncFields(lex, "avg")
2024-05-01 08:40:04 +00:00
if err != nil {
2024-05-03 09:15:09 +00:00
return nil, err
2024-05-01 08:40:04 +00:00
}
sa := &statsAvg{
2024-05-22 09:25:49 +00:00
fields: fields,
2024-05-01 08:40:04 +00:00
}
return sa, nil
}
2024-05-22 09:25:49 +00:00
func parseStatsFuncFields(lex *lexer, funcName string) ([]string, error) {
if !lex.isKeyword(funcName) {
return nil, fmt.Errorf("unexpected func; got %q; want %q", lex.token, funcName)
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse %q args: %w", funcName, err)
}
if len(fields) == 0 || slices.Contains(fields, "*") {
fields = nil
}
return fields, nil
}
func statsFuncFieldsToString(fields []string) string {
if len(fields) == 0 {
return "*"
}
a := make([]string, len(fields))
for i, f := range fields {
a[i] = quoteTokenIfNeeded(f)
}
return strings.Join(a, ", ")
}
func updateNeededFieldsForStatsFunc(neededFields fieldsSet, fields []string) {
if len(fields) == 0 {
neededFields.add("*")
}
neededFields.addFields(fields)
}