VictoriaMetrics/lib/logstorage/pipe_stats.go
Aliaksandr Valialkin a4ea3b87d7
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.

(cherry picked from commit 66b2987f49)
2024-11-08 17:07:56 +01:00

1086 lines
27 KiB
Go

package logstorage
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStats processes '| stats ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe
type pipeStats struct {
// byFields contains field names with optional buckets from 'by(...)' clause.
byFields []*byStatsField
// funcs contains stats functions to execute.
funcs []pipeStatsFunc
}
type pipeStatsFunc struct {
// f is stats function to execute
f statsFunc
// iff is an additional filter, which is applied to results before executing f on them
iff *ifFilter
// resultName is the name of the output generated by f
resultName string
}
type statsFunc interface {
// String returns string representation of statsFunc
String() string
// updateNeededFields update neededFields with the fields needed for calculating the given stats
updateNeededFields(neededFields fieldsSet)
// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc
//
// It also must return the size in bytes of the returned statsProcessor
newStatsProcessor() (statsProcessor, int)
}
// statsProcessor must process stats for some statsFunc.
//
// All the statsProcessor methods are called from a single goroutine at a time,
// so there is no need in the internal synchronization.
type statsProcessor interface {
// updateStatsForAllRows must update statsProcessor stats for all the rows in br.
//
// It must return the change of internal state size in bytes for the statsProcessor.
updateStatsForAllRows(br *blockResult) int
// updateStatsForRow must update statsProcessor stats for the row at rowIndex in br.
//
// It must return the change of internal state size in bytes for the statsProcessor.
updateStatsForRow(br *blockResult, rowIndex int) int
// mergeState must merge sfp state into statsProcessor state.
mergeState(sfp statsProcessor)
// finalizeStats must return the collected stats result from statsProcessor.
finalizeStats() string
}
func (ps *pipeStats) String() string {
s := "stats "
if len(ps.byFields) > 0 {
a := make([]string, len(ps.byFields))
for i := range ps.byFields {
a[i] = ps.byFields[i].String()
}
s += "by (" + strings.Join(a, ", ") + ") "
}
if len(ps.funcs) == 0 {
logger.Panicf("BUG: pipeStats must contain at least a single statsFunc")
}
a := make([]string, len(ps.funcs))
for i, f := range ps.funcs {
line := f.f.String()
if f.iff != nil {
line += " " + f.iff.String()
}
line += " as " + quoteTokenIfNeeded(f.resultName)
a[i] = line
}
s += strings.Join(a, ", ")
return s
}
func (ps *pipeStats) canLiveTail() bool {
return false
}
func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFieldsOrig := neededFields.clone()
neededFields.reset()
// byFields are needed unconditionally, since the output number of rows depends on them.
for _, bf := range ps.byFields {
neededFields.add(bf.name)
}
for _, f := range ps.funcs {
if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) {
f.f.updateNeededFields(neededFields)
if f.iff != nil {
neededFields.addFields(f.iff.neededFields)
}
}
}
unneededFields.reset()
}
func (ps *pipeStats) hasFilterInWithQuery() bool {
for _, f := range ps.funcs {
if f.iff.hasFilterInWithQuery() {
return true
}
}
return false
}
func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
funcsNew := make([]pipeStatsFunc, len(ps.funcs))
for i := range ps.funcs {
f := &ps.funcs[i]
iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
fNew := *f
fNew.iff = iffNew
funcsNew[i] = fNew
}
psNew := *ps
psNew.funcs = funcsNew
return &psNew, nil
}
const stateSizeBudgetChunk = 1 << 20
func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
shards := make([]pipeStatsProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeStatsProcessorShard{
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
ps: ps,
},
}
}
psp := &pipeStatsProcessor{
ps: ps,
stopCh: stopCh,
cancel: cancel,
ppNext: ppNext,
shards: shards,
maxStateSize: maxStateSize,
}
psp.stateSizeBudget.Store(maxStateSize)
return psp
}
type pipeStatsProcessor struct {
ps *pipeStats
stopCh <-chan struct{}
cancel func()
ppNext pipeProcessor
shards []pipeStatsProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type pipeStatsProcessorShard struct {
pipeStatsProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeStatsProcessorShardNopad{})%128]byte
}
type pipeStatsProcessorShardNopad struct {
ps *pipeStats
m map[string]*pipeStatsGroup
// bms and brTmp are used for applying per-func filters.
bms []bitmap
brTmp blockResult
columnValues [][]string
keyBuf []byte
stateSizeBudget int
}
func (shard *pipeStatsProcessorShard) init() {
if shard.m != nil {
// Already initialized
return
}
funcsLen := len(shard.ps.funcs)
shard.m = make(map[string]*pipeStatsGroup)
shard.bms = make([]bitmap, funcsLen)
}
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
shard.init()
byFields := shard.ps.byFields
// Update shard.bms by applying per-function filters
shard.applyPerFunctionFilters(br)
// Process stats for the defined functions
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
psg := shard.getPipeStatsGroup(nil)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
if len(byFields) == 1 {
// Special case for grouping by a single column.
bf := byFields[0]
c := br.getColumnByName(bf.name)
if c.isConst {
// Fast path for column with constant value.
v := br.getBucketedValue(c.valuesEncoded[0], bf)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
values := c.getValuesBucketed(br, bf)
if areConstValues(values) {
// Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
return
}
// Slower generic path for a column with different values.
var psg *pipeStatsGroup
keyBuf := shard.keyBuf[:0]
for i := 0; i < br.rowsLen; i++ {
if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
shard.keyBuf = keyBuf
return
}
// Obtain columns for byFields
columnValues := shard.columnValues[:0]
for _, bf := range byFields {
c := br.getColumnByName(bf.name)
values := c.getValuesBucketed(br, bf)
columnValues = append(columnValues, values)
}
shard.columnValues = columnValues
// Verify whether all the 'by (...)' columns are constant.
areAllConstColumns := true
for _, values := range columnValues {
if !areConstValues(values) {
areAllConstColumns = false
break
}
}
if areAllConstColumns {
// Fast path for constant 'by (...)' columns.
keyBuf := shard.keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
psg := shard.getPipeStatsGroup(keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
shard.keyBuf = keyBuf
return
}
// The slowest path - group by multiple columns with different values across rows.
var psg *pipeStatsGroup
keyBuf := shard.keyBuf[:0]
for i := 0; i < br.rowsLen; i++ {
// Verify whether the key for 'by (...)' fields equals the previous key
sameValue := i > 0
for _, values := range columnValues {
if i <= 0 || values[i-1] != values[i] {
sameValue = false
break
}
}
if !sameValue {
// Construct new key for the 'by (...)' fields
keyBuf = keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
}
psg = shard.getPipeStatsGroup(keyBuf)
}
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
}
shard.keyBuf = keyBuf
}
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
funcs := shard.ps.funcs
for i := range funcs {
bm := &shard.bms[i]
bm.init(br.rowsLen)
bm.setBits()
iff := funcs[i].iff
if iff != nil {
iff.f.applyToBlockResult(br, bm)
}
}
}
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
psg := shard.m[string(key)]
if psg != nil {
return psg
}
sfps := make([]statsProcessor, len(shard.ps.funcs))
for i, f := range shard.ps.funcs {
sfp, stateSize := f.f.newStatsProcessor()
sfps[i] = sfp
shard.stateSizeBudget -= stateSize
}
psg = &pipeStatsGroup{
funcs: shard.ps.funcs,
sfps: sfps,
}
shard.m[string(key)] = psg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
return psg
}
type pipeStatsGroup struct {
funcs []pipeStatsFunc
sfps []statsProcessor
}
func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int {
n := 0
for i, sfp := range psg.sfps {
iff := psg.funcs[i].iff
if iff == nil {
n += sfp.updateStatsForAllRows(br)
} else {
brTmp.initFromFilterAllColumns(br, &bms[i])
n += sfp.updateStatsForAllRows(brTmp)
}
}
return n
}
func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int {
n := 0
for i, sfp := range psg.sfps {
if bms[i].isSetBit(rowIdx) {
n += sfp.updateStatsForRow(br, rowIdx)
}
}
return n
}
func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
shard := &psp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
psp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
func (psp *pipeStatsProcessor) flush() error {
if n := psp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge states across shards in parallel
ms, err := psp.mergeShardsParallel()
if err != nil {
return err
}
if needStop(psp.stopCh) {
return nil
}
if len(psp.ps.byFields) == 0 && len(ms) == 0 {
// Special case - zero matching rows.
psp.shards[0].init()
_ = psp.shards[0].getPipeStatsGroup(nil)
ms = append(ms, psp.shards[0].m)
}
// Write the calculated stats in parallel to the next pipe.
var wg sync.WaitGroup
for i, m := range ms {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
psp.writeShardData(workerID, m)
}(uint(i))
}
wg.Wait()
return nil
}
func (psp *pipeStatsProcessor) writeShardData(workerID uint, m map[string]*pipeStatsGroup) {
byFields := psp.ps.byFields
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
for _, bf := range byFields {
rcs = appendResultColumnWithName(rcs, bf.name)
}
for _, f := range psp.ps.funcs {
rcs = appendResultColumnWithName(rcs, f.resultName)
}
var br blockResult
var values []string
rowsCount := 0
valuesLen := 0
for key, psg := range m {
// m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
if needStop(psp.stopCh) {
return
}
// Unmarshal values for byFields from key.
values = values[:0]
keyBuf := bytesutil.ToUnsafeBytes(key)
for len(keyBuf) > 0 {
v, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf)
}
keyBuf = keyBuf[nSize:]
values = append(values, bytesutil.ToUnsafeString(v))
}
if len(values) != len(byFields) {
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
}
// calculate values for stats functions
for _, sfp := range psg.sfps {
value := sfp.finalizeStats()
values = append(values, value)
}
if len(values) != len(rcs) {
logger.Panicf("BUG: len(values)=%d must be equal to len(rcs)=%d", len(values), len(rcs))
}
for i, v := range values {
rcs[i].addValue(v)
valuesLen += len(v)
}
rowsCount++
if valuesLen >= 1_000_000 {
br.setResultColumns(rcs, rowsCount)
rowsCount = 0
psp.ppNext.writeBlock(workerID, &br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
valuesLen = 0
}
}
br.setResultColumns(rcs, rowsCount)
psp.ppNext.writeBlock(workerID, &br)
}
func (psp *pipeStatsProcessor) mergeShardsParallel() ([]map[string]*pipeStatsGroup, error) {
shards := psp.shards
shardsLen := len(shards)
if shardsLen == 1 {
var ms []map[string]*pipeStatsGroup
shards[0].init()
if len(shards[0].m) > 0 {
ms = append(ms, shards[0].m)
}
return ms, nil
}
var wg sync.WaitGroup
perShardMaps := make([][]map[string]*pipeStatsGroup, shardsLen)
for i := range shards {
wg.Add(1)
go func(idx int) {
defer wg.Done()
shardMaps := make([]map[string]*pipeStatsGroup, shardsLen)
for i := range shardMaps {
shardMaps[i] = make(map[string]*pipeStatsGroup)
}
shards[idx].init()
n := int64(0)
nTotal := int64(0)
for k, psg := range shards[idx].m {
if needStop(psp.stopCh) {
return
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(k))
m := shardMaps[h%uint64(len(shardMaps))]
n += updatePipeStatsMap(m, k, psg)
if n > stateSizeBudgetChunk {
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
psp.stateSizeBudget.Add(-n)
perShardMaps[idx] = shardMaps
// Clean the original map and return its state size budget back.
shards[idx].m = nil
psp.stateSizeBudget.Add(nTotal)
}(i)
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Merge per-shard entries into perShardMaps[0]
for i := range perShardMaps {
wg.Add(1)
go func(idx int) {
defer wg.Done()
m := perShardMaps[0][idx]
for i := 1; i < len(perShardMaps); i++ {
n := int64(0)
nTotal := int64(0)
for k, psg := range perShardMaps[i][idx] {
if needStop(psp.stopCh) {
return
}
n += updatePipeStatsMap(m, k, psg)
if n > stateSizeBudgetChunk {
if nRemaining := psp.stateSizeBudget.Add(-n); nRemaining < 0 {
return
}
nTotal += n
n = 0
}
}
nTotal += n
psp.stateSizeBudget.Add(-n)
// Clean the original map and return its state size budget back.
perShardMaps[i][idx] = nil
psp.stateSizeBudget.Add(nTotal)
}
}(i)
}
wg.Wait()
if needStop(psp.stopCh) {
return nil, nil
}
if n := psp.stateSizeBudget.Load(); n < 0 {
return nil, fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
}
// Filter out maps without entries
ms := perShardMaps[0]
result := ms[:0]
for _, m := range ms {
if len(m) > 0 {
result = append(result, m)
}
}
return result, nil
}
func updatePipeStatsMap(m map[string]*pipeStatsGroup, k string, psgSrc *pipeStatsGroup) int64 {
psgDst := m[k]
if psgDst != nil {
for i, sfp := range psgDst.sfps {
sfp.mergeState(psgSrc.sfps[i])
}
return 0
}
m[k] = psgSrc
return int64(unsafe.Sizeof(k) + unsafe.Sizeof(psgSrc))
}
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
if needStatsKeyword {
if !lex.isKeyword("stats") {
return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
}
lex.nextToken()
}
var ps pipeStats
if lex.isKeyword("by", "(") {
if lex.isKeyword("by") {
lex.nextToken()
}
bfs, err := parseByStatsFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
}
ps.byFields = bfs
}
seenByFields := make(map[string]*byStatsField, len(ps.byFields))
for _, bf := range ps.byFields {
seenByFields[bf.name] = bf
}
seenResultNames := make(map[string]statsFunc)
var funcs []pipeStatsFunc
for {
var f pipeStatsFunc
sf, err := parseStatsFunc(lex)
if err != nil {
return nil, err
}
f.f = sf
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err)
}
f.iff = iff
}
resultName := ""
if lex.isKeyword(",", "|", ")", "") {
resultName = sf.String()
if f.iff != nil {
resultName += " " + f.iff.String()
}
} else {
if lex.isKeyword("as") {
lex.nextToken()
}
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err)
}
resultName = fieldName
}
if bf := seenByFields[resultName]; bf != nil {
return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf)
}
if sfPrev := seenResultNames[resultName]; sfPrev != nil {
return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf)
}
seenResultNames[resultName] = sf
f.resultName = resultName
funcs = append(funcs, f)
if lex.isKeyword("|", ")", "") {
ps.funcs = funcs
return &ps, nil
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", lex.token, sf)
}
lex.nextToken()
}
}
func parseStatsFunc(lex *lexer) (statsFunc, error) {
switch {
case lex.isKeyword("avg"):
sas, err := parseStatsAvg(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'avg' func: %w", err)
}
return sas, nil
case lex.isKeyword("count"):
scs, err := parseStatsCount(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' func: %w", err)
}
return scs, nil
case lex.isKeyword("count_empty"):
scs, err := parseStatsCountEmpty(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count_empty' func: %w", err)
}
return scs, nil
case lex.isKeyword("count_uniq"):
sus, err := parseStatsCountUniq(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
}
return sus, nil
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'max' func: %w", err)
}
return sms, nil
case lex.isKeyword("median"):
sms, err := parseStatsMedian(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'median' func: %w", err)
}
return sms, nil
case lex.isKeyword("min"):
sms, err := parseStatsMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'min' func: %w", err)
}
return sms, nil
case lex.isKeyword("quantile"):
sqs, err := parseStatsQuantile(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
return sqs, nil
case lex.isKeyword("row_any"):
sas, err := parseStatsRowAny(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err)
}
return sas, nil
case lex.isKeyword("row_max"):
sms, err := parseStatsRowMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("row_min"):
sms, err := parseStatsRowMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
}
return sms, nil
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
}
return sss, nil
case lex.isKeyword("sum_len"):
sss, err := parseStatsSumLen(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err)
}
return sss, nil
case lex.isKeyword("uniq_values"):
sus, err := parseStatsUniqValues(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'uniq_values' func: %w", err)
}
return sus, nil
case lex.isKeyword("values"):
svs, err := parseStatsValues(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'values' func: %w", err)
}
return svs, nil
default:
return nil, fmt.Errorf("unknown stats func %q", lex.token)
}
}
var statsNames = []string{
"avg",
"count",
"count_empty",
"count_uniq",
"max",
"median",
"min",
"quantile",
"row_any",
"row_max",
"row_min",
"sum",
"sum_len",
"uniq_values",
"values",
}
var zeroByStatsField = &byStatsField{}
// byStatsField represents 'by (...)' part of the pipeStats.
//
// It can have either 'name' representation or 'name:bucket' or 'name:bucket offset off' representation,
// where `bucket` and `off` can contain duration, size or numeric value for creating different buckets
// for 'value/bucket'.
type byStatsField struct {
name string
// bucketSizeStr is string representation of the bucket size
bucketSizeStr string
// bucketSize is the bucket for grouping the given field values with value/bucketSize calculations
bucketSize float64
// bucketOffsetStr is string representation of the offset for bucketSize
bucketOffsetStr string
// bucketOffset is the offset for bucketSize
bucketOffset float64
}
func (bf *byStatsField) String() string {
s := quoteTokenIfNeeded(bf.name)
if bf.bucketSizeStr != "" {
s += ":" + bf.bucketSizeStr
if bf.bucketOffsetStr != "" {
s += " offset " + bf.bucketOffsetStr
}
}
return s
}
func (bf *byStatsField) hasBucketConfig() bool {
return len(bf.bucketSizeStr) > 0 || len(bf.bucketOffsetStr) > 0
}
func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
var bfs []*byStatsField
for {
lex.nextToken()
if lex.isKeyword(")") {
lex.nextToken()
return bfs, nil
}
fieldName, err := getCompoundPhrase(lex, false)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
fieldName = getCanonicalColumnName(fieldName)
bf := &byStatsField{
name: fieldName,
}
if lex.isKeyword(":") {
// Parse bucket size
lex.nextToken()
bucketSizeStr := lex.token
lex.nextToken()
if bucketSizeStr == "/" {
bucketSizeStr += lex.token
lex.nextToken()
}
if bucketSizeStr != "year" && bucketSizeStr != "month" {
bucketSize, ok := tryParseBucketSize(bucketSizeStr)
if !ok {
return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
}
bf.bucketSize = bucketSize
}
bf.bucketSizeStr = bucketSizeStr
// Parse bucket offset
if lex.isKeyword("offset") {
lex.nextToken()
bucketOffsetStr := lex.token
lex.nextToken()
if bucketOffsetStr == "-" {
bucketOffsetStr += lex.token
lex.nextToken()
}
bucketOffset, ok := tryParseBucketOffset(bucketOffsetStr)
if !ok {
return nil, fmt.Errorf("cannot parse bucket offset for field %q: %q", fieldName, bucketOffsetStr)
}
bf.bucketOffsetStr = bucketOffsetStr
bf.bucketOffset = bucketOffset
}
}
bfs = append(bfs, bf)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return bfs, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
// tryParseBucketOffset tries parsing bucket offset, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to nanoseconds
// - bytes: 1.5KiB
func tryParseBucketOffset(s string) (float64, bool) {
// Try parsing s as floating point number
if f, ok := tryParseFloat64(s); ok {
return f, true
}
// Try parsing s as duration (1s, 5m, etc.)
if nsecs, ok := tryParseDuration(s); ok {
return float64(nsecs), true
}
// Try parsing s as bytes (KiB, MB, etc.)
if n, ok := tryParseBytes(s); ok {
return float64(n), true
}
return 0, false
}
// tryParseBucketSize tries parsing bucket size, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to nanoseconds
// - bytes: 1.5KiB
// - ipv4 mask: /24
func tryParseBucketSize(s string) (float64, bool) {
switch s {
case "nanosecond":
return 1, true
case "microsecond":
return nsecsPerMicrosecond, true
case "millisecond":
return nsecsPerMillisecond, true
case "second":
return nsecsPerSecond, true
case "minute":
return nsecsPerMinute, true
case "hour":
return nsecsPerHour, true
case "day":
return nsecsPerDay, true
case "week":
return nsecsPerWeek, true
}
// Try parsing s as floating point number
if f, ok := tryParseFloat64(s); ok {
return f, true
}
// Try parsing s as duration (1s, 5m, etc.)
if nsecs, ok := tryParseDuration(s); ok {
return float64(nsecs), true
}
// Try parsing s as bytes (KiB, MB, etc.)
if n, ok := tryParseBytes(s); ok {
return float64(n), true
}
if n, ok := tryParseIPv4Mask(s); ok {
return float64(n), true
}
return 0, false
}
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
var fields []string
for {
lex.nextToken()
if lex.isKeyword(")") {
lex.nextToken()
return fields, nil
}
if lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected `,`")
}
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
fields = append(fields, field)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return fields, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
func parseFieldName(lex *lexer) (string, error) {
fieldName, err := getCompoundToken(lex)
if err != nil {
return "", fmt.Errorf("cannot parse field name: %w", err)
}
fieldName = getCanonicalColumnName(fieldName)
return fieldName, nil
}
func fieldNamesString(fields []string) string {
a := make([]string, len(fields))
for i, f := range fields {
if f != "*" {
f = quoteTokenIfNeeded(f)
}
a[i] = f
}
return strings.Join(a, ", ")
}
func areConstValues(values []string) bool {
if len(values) == 0 {
return false
}
v := values[0]
for i := 1; i < len(values); i++ {
if v != values[i] {
return false
}
}
return true
}