VictoriaMetrics/lib/logstorage/pipe_stats.go

945 lines
24 KiB
Go
Raw Normal View History

2024-04-29 01:44:54 +00:00
package logstorage
import (
"fmt"
"strings"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
2024-04-30 23:19:22 +00:00
// pipeStats processes '| stats ...' queries.
//
2024-05-05 10:43:38 +00:00
// See https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe
2024-04-29 01:44:54 +00:00
type pipeStats struct {
2024-05-03 09:15:09 +00:00
// byFields contains field names with optional buckets from 'by(...)' clause.
2024-05-05 10:48:54 +00:00
byFields []*byStatsField
2024-04-30 23:19:22 +00:00
// funcs contains stats functions to execute.
2024-05-15 22:49:11 +00:00
funcs []pipeStatsFunc
}
type pipeStatsFunc struct {
// f is stats function to execute
f statsFunc
2024-05-17 02:11:10 +00:00
// iff is an additional filter, which is applied to results before executing f on them
2024-05-21 10:55:11 +00:00
iff *ifFilter
2024-05-17 02:11:10 +00:00
2024-05-15 22:49:11 +00:00
// resultName is the name of the output generated by f
resultName string
2024-04-29 01:44:54 +00:00
}
type statsFunc interface {
// String returns string representation of statsFunc
String() string
2024-05-17 02:11:10 +00:00
// updateNeededFields update neededFields with the fields needed for calculating the given stats
updateNeededFields(neededFields fieldsSet)
2024-04-29 01:44:54 +00:00
2024-05-17 02:11:10 +00:00
// newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc
2024-04-29 01:44:54 +00:00
//
2024-05-17 02:11:10 +00:00
// It also must return the size in bytes of the returned statsProcessor
2024-04-29 01:44:54 +00:00
newStatsProcessor() (statsProcessor, int)
}
// statsProcessor must process stats for some statsFunc.
//
// All the statsProcessor methods are called from a single goroutine at a time,
// so there is no need in the internal synchronization.
type statsProcessor interface {
2024-04-30 21:03:34 +00:00
// updateStatsForAllRows must update statsProcessor stats for all the rows in br.
2024-04-29 01:44:54 +00:00
//
2024-04-30 21:03:34 +00:00
// It must return the change of internal state size in bytes for the statsProcessor.
updateStatsForAllRows(br *blockResult) int
2024-04-29 01:44:54 +00:00
2024-04-30 21:03:34 +00:00
// updateStatsForRow must update statsProcessor stats for the row at rowIndex in br.
2024-04-29 01:44:54 +00:00
//
2024-04-30 21:03:34 +00:00
// It must return the change of internal state size in bytes for the statsProcessor.
updateStatsForRow(br *blockResult, rowIndex int) int
2024-04-29 01:44:54 +00:00
// mergeState must merge sfp state into statsProcessor state.
mergeState(sfp statsProcessor)
2024-04-30 23:19:22 +00:00
// finalizeStats must return the collected stats result from statsProcessor.
finalizeStats() string
2024-04-29 01:44:54 +00:00
}
func (ps *pipeStats) String() string {
s := "stats "
if len(ps.byFields) > 0 {
2024-05-03 09:15:09 +00:00
a := make([]string, len(ps.byFields))
for i := range ps.byFields {
a[i] = ps.byFields[i].String()
}
s += "by (" + strings.Join(a, ", ") + ") "
2024-04-29 01:44:54 +00:00
}
if len(ps.funcs) == 0 {
logger.Panicf("BUG: pipeStats must contain at least a single statsFunc")
}
a := make([]string, len(ps.funcs))
for i, f := range ps.funcs {
2024-05-17 02:11:10 +00:00
line := f.f.String()
if f.iff != nil {
2024-05-21 10:55:11 +00:00
line += " " + f.iff.String()
2024-05-17 02:11:10 +00:00
}
line += " as " + quoteTokenIfNeeded(f.resultName)
a[i] = line
2024-04-29 01:44:54 +00:00
}
s += strings.Join(a, ", ")
return s
}
2024-05-09 00:52:28 +00:00
func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFieldsOrig := neededFields.clone()
neededFields.reset()
2024-05-04 22:28:01 +00:00
2024-05-15 22:35:49 +00:00
// byFields are needed unconditionally, since the output number of rows depends on them.
for _, bf := range ps.byFields {
neededFields.add(bf.name)
2024-05-11 03:53:11 +00:00
}
2024-05-17 02:11:10 +00:00
for _, f := range ps.funcs {
2024-05-15 22:49:11 +00:00
if neededFieldsOrig.contains(f.resultName) && !unneededFields.contains(f.resultName) {
2024-05-17 02:11:10 +00:00
f.f.updateNeededFields(neededFields)
if f.iff != nil {
2024-05-21 10:55:11 +00:00
neededFields.addFields(f.iff.neededFields)
2024-05-17 02:11:10 +00:00
}
2024-05-09 00:52:28 +00:00
}
2024-05-04 22:28:01 +00:00
}
2024-05-09 00:52:28 +00:00
unneededFields.reset()
2024-05-04 22:28:01 +00:00
}
2024-05-25 12:37:26 +00:00
func (ps *pipeStats) optimize() {
for _, f := range ps.funcs {
f.iff.optimizeFilterIn()
}
}
func (ps *pipeStats) hasFilterInWithQuery() bool {
for _, f := range ps.funcs {
if f.iff.hasFilterInWithQuery() {
return true
}
}
return false
}
func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
funcsNew := make([]pipeStatsFunc, len(ps.funcs))
2024-05-25 22:21:57 +00:00
for i := range ps.funcs {
f := &ps.funcs[i]
2024-05-25 12:37:26 +00:00
iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
2024-05-25 22:21:57 +00:00
fNew := *f
fNew.iff = iffNew
funcsNew[i] = fNew
2024-05-25 12:37:26 +00:00
}
psNew := *ps
2024-05-25 22:21:57 +00:00
psNew.funcs = funcsNew
2024-05-25 12:37:26 +00:00
return &psNew, nil
}
2024-04-29 01:44:54 +00:00
const stateSizeBudgetChunk = 1 << 20
2024-05-25 18:13:01 +00:00
func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
2024-04-29 01:44:54 +00:00
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
shards := make([]pipeStatsProcessorShard, workersCount)
for i := range shards {
2024-05-17 02:11:10 +00:00
shards[i] = pipeStatsProcessorShard{
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
ps: ps,
stateSizeBudget: stateSizeBudgetChunk,
},
}
2024-04-29 01:44:54 +00:00
maxStateSize -= stateSizeBudgetChunk
}
2024-04-30 23:19:22 +00:00
psp := &pipeStatsProcessor{
2024-04-29 01:44:54 +00:00
ps: ps,
stopCh: stopCh,
cancel: cancel,
2024-05-25 18:13:01 +00:00
ppNext: ppNext,
2024-04-29 01:44:54 +00:00
shards: shards,
maxStateSize: maxStateSize,
}
2024-04-30 23:19:22 +00:00
psp.stateSizeBudget.Store(maxStateSize)
2024-04-29 01:44:54 +00:00
2024-04-30 23:19:22 +00:00
return psp
2024-04-29 01:44:54 +00:00
}
type pipeStatsProcessor struct {
ps *pipeStats
stopCh <-chan struct{}
cancel func()
2024-05-25 18:13:01 +00:00
ppNext pipeProcessor
2024-04-29 01:44:54 +00:00
shards []pipeStatsProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type pipeStatsProcessorShard struct {
pipeStatsProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeStatsProcessorShardNopad{})%128]byte
}
type pipeStatsProcessorShardNopad struct {
ps *pipeStats
2024-05-17 02:11:10 +00:00
m map[string]*pipeStatsGroup
2024-05-28 23:48:40 +00:00
// bms and brTmp are used for applying per-func filters.
bms []bitmap
brTmp blockResult
2024-04-29 01:44:54 +00:00
columnValues [][]string
keyBuf []byte
stateSizeBudget int
}
2024-05-25 15:29:24 +00:00
func (shard *pipeStatsProcessorShard) init() {
if shard.m != nil {
// Already initialized
return
}
funcsLen := len(shard.ps.funcs)
shard.m = make(map[string]*pipeStatsGroup)
shard.bms = make([]bitmap, funcsLen)
}
2024-05-06 16:33:35 +00:00
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
2024-05-25 15:29:24 +00:00
shard.init()
2024-05-06 16:33:35 +00:00
byFields := shard.ps.byFields
2024-04-29 01:44:54 +00:00
2024-05-28 23:48:40 +00:00
// Update shard.bms by applying per-function filters
shard.applyPerFunctionFilters(br)
2024-05-17 02:11:10 +00:00
// Process stats for the defined functions
2024-04-29 01:44:54 +00:00
if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key.
2024-05-10 14:41:57 +00:00
psg := shard.getPipeStatsGroup(nil)
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
2024-04-29 01:44:54 +00:00
return
}
if len(byFields) == 1 {
// Special case for grouping by a single column.
2024-05-03 09:15:09 +00:00
bf := byFields[0]
c := br.getColumnByName(bf.name)
2024-04-30 21:03:34 +00:00
if c.isConst {
2024-04-29 01:44:54 +00:00
// Fast path for column with constant value.
2024-05-15 20:19:21 +00:00
v := br.getBucketedValue(c.valuesEncoded[0], bf)
2024-05-03 09:15:09 +00:00
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
2024-05-10 14:41:57 +00:00
psg := shard.getPipeStatsGroup(shard.keyBuf)
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
2024-04-29 01:44:54 +00:00
return
}
2024-05-15 20:19:21 +00:00
values := c.getValuesBucketed(br, bf)
2024-05-03 09:15:09 +00:00
if areConstValues(values) {
// Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
2024-05-10 14:41:57 +00:00
psg := shard.getPipeStatsGroup(shard.keyBuf)
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
2024-05-03 09:15:09 +00:00
return
}
// Slower generic path for a column with different values.
2024-05-10 14:41:57 +00:00
var psg *pipeStatsGroup
2024-04-30 21:03:34 +00:00
keyBuf := shard.keyBuf[:0]
for i := range br.timestamps {
2024-04-29 01:44:54 +00:00
if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
2024-05-10 14:41:57 +00:00
psg = shard.getPipeStatsGroup(keyBuf)
2024-04-29 01:44:54 +00:00
}
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
2024-04-29 01:44:54 +00:00
}
shard.keyBuf = keyBuf
return
}
2024-05-03 09:15:09 +00:00
// Obtain columns for byFields
columnValues := shard.columnValues[:0]
for _, bf := range byFields {
c := br.getColumnByName(bf.name)
2024-05-15 20:19:21 +00:00
values := c.getValuesBucketed(br, bf)
2024-05-03 09:15:09 +00:00
columnValues = append(columnValues, values)
}
shard.columnValues = columnValues
2024-04-30 21:03:34 +00:00
// Verify whether all the 'by (...)' columns are constant.
areAllConstColumns := true
2024-05-03 09:15:09 +00:00
for _, values := range columnValues {
if !areConstValues(values) {
2024-04-30 21:03:34 +00:00
areAllConstColumns = false
break
2024-04-29 01:44:54 +00:00
}
2024-04-30 21:03:34 +00:00
}
if areAllConstColumns {
// Fast path for constant 'by (...)' columns.
2024-05-03 09:15:09 +00:00
keyBuf := shard.keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
}
2024-05-10 14:41:57 +00:00
psg := shard.getPipeStatsGroup(keyBuf)
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForAllRows(shard.bms, br, &shard.brTmp)
2024-05-03 09:15:09 +00:00
shard.keyBuf = keyBuf
2024-04-29 01:44:54 +00:00
return
}
2024-04-30 21:03:34 +00:00
// The slowest path - group by multiple columns with different values across rows.
2024-05-10 14:41:57 +00:00
var psg *pipeStatsGroup
2024-05-03 09:15:09 +00:00
keyBuf := shard.keyBuf[:0]
2024-04-30 21:03:34 +00:00
for i := range br.timestamps {
// Verify whether the key for 'by (...)' fields equals the previous key
2024-05-10 14:41:57 +00:00
sameValue := i > 0
2024-04-29 01:44:54 +00:00
for _, values := range columnValues {
if i <= 0 || values[i-1] != values[i] {
sameValue = false
break
}
}
if !sameValue {
// Construct new key for the 'by (...)' fields
keyBuf = keyBuf[:0]
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
}
2024-05-10 14:41:57 +00:00
psg = shard.getPipeStatsGroup(keyBuf)
2024-04-29 01:44:54 +00:00
}
2024-05-28 23:48:40 +00:00
shard.stateSizeBudget -= psg.updateStatsForRow(shard.bms, br, i)
2024-04-29 01:44:54 +00:00
}
shard.keyBuf = keyBuf
}
2024-05-28 23:48:40 +00:00
func (shard *pipeStatsProcessorShard) applyPerFunctionFilters(br *blockResult) {
2024-05-17 02:11:10 +00:00
funcs := shard.ps.funcs
for i := range funcs {
bm := &shard.bms[i]
2024-05-28 23:48:40 +00:00
bm.init(len(br.timestamps))
2024-05-17 02:11:10 +00:00
bm.setBits()
2024-05-28 23:48:40 +00:00
iff := funcs[i].iff
if iff != nil {
iff.f.applyToBlockResult(br, bm)
2024-05-18 10:33:34 +00:00
}
2024-05-17 02:11:10 +00:00
}
}
2024-05-10 14:41:57 +00:00
func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
psg := shard.m[string(key)]
if psg != nil {
return psg
}
sfps := make([]statsProcessor, len(shard.ps.funcs))
for i, f := range shard.ps.funcs {
2024-05-15 22:49:11 +00:00
sfp, stateSize := f.f.newStatsProcessor()
2024-05-10 14:41:57 +00:00
sfps[i] = sfp
shard.stateSizeBudget -= stateSize
}
psg = &pipeStatsGroup{
2024-05-28 23:48:40 +00:00
funcs: shard.ps.funcs,
sfps: sfps,
2024-05-06 16:33:35 +00:00
}
2024-05-10 14:41:57 +00:00
shard.m[string(key)] = psg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
return psg
2024-05-06 16:33:35 +00:00
}
type pipeStatsGroup struct {
2024-05-28 23:48:40 +00:00
funcs []pipeStatsFunc
sfps []statsProcessor
2024-05-06 16:33:35 +00:00
}
2024-05-28 23:48:40 +00:00
func (psg *pipeStatsGroup) updateStatsForAllRows(bms []bitmap, br, brTmp *blockResult) int {
2024-05-10 14:41:57 +00:00
n := 0
2024-05-17 02:11:10 +00:00
for i, sfp := range psg.sfps {
2024-05-28 23:48:40 +00:00
iff := psg.funcs[i].iff
if iff == nil {
n += sfp.updateStatsForAllRows(br)
} else {
brTmp.initFromFilterAllColumns(br, &bms[i])
n += sfp.updateStatsForAllRows(brTmp)
}
2024-05-10 14:41:57 +00:00
}
return n
}
2024-05-28 23:48:40 +00:00
func (psg *pipeStatsGroup) updateStatsForRow(bms []bitmap, br *blockResult, rowIdx int) int {
2024-05-10 14:41:57 +00:00
n := 0
2024-05-17 02:11:10 +00:00
for i, sfp := range psg.sfps {
2024-05-28 23:48:40 +00:00
if bms[i].isSetBit(rowIdx) {
n += sfp.updateStatsForRow(br, rowIdx)
}
2024-05-10 14:41:57 +00:00
}
return n
}
2024-05-06 16:33:35 +00:00
func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &psp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
psp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
shard.writeBlock(br)
}
2024-04-30 23:19:22 +00:00
func (psp *pipeStatsProcessor) flush() error {
if n := psp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
2024-04-29 01:44:54 +00:00
}
// Merge states across shards
2024-04-30 23:19:22 +00:00
shards := psp.shards
2024-05-25 18:45:11 +00:00
shardMain := &shards[0]
shardMain.init()
m := shardMain.m
2024-04-29 01:44:54 +00:00
shards = shards[1:]
for i := range shards {
shard := &shards[i]
2024-05-10 14:41:57 +00:00
for key, psg := range shard.m {
2024-04-29 01:44:54 +00:00
// shard.m may be quite big, so this loop can take a lot of time and CPU.
2024-04-30 21:03:34 +00:00
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
2024-05-15 00:45:43 +00:00
if needStop(psp.stopCh) {
2024-04-29 01:44:54 +00:00
return nil
}
spgBase := m[key]
if spgBase == nil {
2024-05-10 14:41:57 +00:00
m[key] = psg
2024-04-29 01:44:54 +00:00
} else {
for i, sfp := range spgBase.sfps {
2024-05-10 14:41:57 +00:00
sfp.mergeState(psg.sfps[i])
2024-04-29 01:44:54 +00:00
}
}
}
}
2024-05-25 18:13:01 +00:00
// Write per-group states to ppNext
2024-04-30 23:19:22 +00:00
byFields := psp.ps.byFields
2024-04-29 01:44:54 +00:00
if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows.
2024-05-25 18:45:11 +00:00
_ = shardMain.getPipeStatsGroup(nil)
m = shardMain.m
2024-04-29 01:44:54 +00:00
}
2024-05-15 22:49:11 +00:00
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs))
2024-05-03 09:15:09 +00:00
for _, bf := range byFields {
2024-05-19 19:25:52 +00:00
rcs = appendResultColumnWithName(rcs, bf.name)
2024-04-30 23:19:22 +00:00
}
2024-05-15 22:49:11 +00:00
for _, f := range psp.ps.funcs {
2024-05-19 19:25:52 +00:00
rcs = appendResultColumnWithName(rcs, f.resultName)
2024-04-30 23:19:22 +00:00
}
2024-05-05 23:27:05 +00:00
var br blockResult
2024-04-30 23:19:22 +00:00
2024-05-05 23:27:05 +00:00
var values []string
2024-05-21 08:39:02 +00:00
rowsCount := 0
2024-05-05 23:27:05 +00:00
valuesLen := 0
2024-05-10 14:41:57 +00:00
for key, psg := range m {
2024-04-29 01:44:54 +00:00
// m may be quite big, so this loop can take a lot of time and CPU.
2024-04-30 21:03:34 +00:00
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
2024-05-15 00:45:43 +00:00
if needStop(psp.stopCh) {
2024-04-29 01:44:54 +00:00
return nil
}
// Unmarshal values for byFields from key.
values = values[:0]
keyBuf := bytesutil.ToUnsafeBytes(key)
for len(keyBuf) > 0 {
2024-05-13 21:44:44 +00:00
v, nSize := encoding.UnmarshalBytes(keyBuf)
if nSize <= 0 {
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q", keyBuf)
2024-04-29 01:44:54 +00:00
}
2024-05-13 21:44:44 +00:00
keyBuf = keyBuf[nSize:]
2024-04-29 01:44:54 +00:00
values = append(values, bytesutil.ToUnsafeString(v))
}
if len(values) != len(byFields) {
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
}
2024-04-30 23:19:22 +00:00
// calculate values for stats functions
2024-05-10 14:41:57 +00:00
for _, sfp := range psg.sfps {
2024-04-30 23:19:22 +00:00
value := sfp.finalizeStats()
values = append(values, value)
2024-04-29 01:44:54 +00:00
}
2024-04-30 21:03:34 +00:00
2024-05-05 23:27:05 +00:00
if len(values) != len(rcs) {
logger.Panicf("BUG: len(values)=%d must be equal to len(rcs)=%d", len(values), len(rcs))
}
for i, v := range values {
rcs[i].addValue(v)
valuesLen += len(v)
}
2024-05-21 08:39:02 +00:00
rowsCount++
2024-05-05 23:27:05 +00:00
if valuesLen >= 1_000_000 {
2024-05-21 08:39:02 +00:00
br.setResultColumns(rcs, rowsCount)
rowsCount = 0
2024-05-25 18:13:01 +00:00
psp.ppNext.writeBlock(0, &br)
2024-05-05 23:27:05 +00:00
br.reset()
for i := range rcs {
2024-05-19 19:25:52 +00:00
rcs[i].resetValues()
2024-05-05 23:27:05 +00:00
}
valuesLen = 0
2024-04-30 23:19:22 +00:00
}
}
2024-05-06 16:33:35 +00:00
2024-05-21 08:39:02 +00:00
br.setResultColumns(rcs, rowsCount)
2024-05-25 18:13:01 +00:00
psp.ppNext.writeBlock(0, &br)
2024-04-29 01:44:54 +00:00
return nil
}
2024-05-27 14:48:34 +00:00
func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
if needStatsKeyword {
if !lex.isKeyword("stats") {
return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
}
lex.nextToken()
2024-04-29 01:44:54 +00:00
}
var ps pipeStats
2024-05-15 20:31:21 +00:00
if lex.isKeyword("by", "(") {
if lex.isKeyword("by") {
lex.nextToken()
}
2024-05-05 10:48:54 +00:00
bfs, err := parseByStatsFields(lex)
2024-04-29 01:44:54 +00:00
if err != nil {
2024-05-03 09:15:09 +00:00
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
2024-04-29 01:44:54 +00:00
}
2024-05-03 09:15:09 +00:00
ps.byFields = bfs
2024-04-29 01:44:54 +00:00
}
2024-05-30 12:26:05 +00:00
seenByFields := make(map[string]*byStatsField, len(ps.byFields))
for _, bf := range ps.byFields {
seenByFields[bf.name] = bf
}
seenResultNames := make(map[string]statsFunc)
2024-05-15 22:49:11 +00:00
var funcs []pipeStatsFunc
2024-04-29 01:44:54 +00:00
for {
2024-05-17 02:11:10 +00:00
var f pipeStatsFunc
2024-05-30 12:26:05 +00:00
2024-05-15 22:41:13 +00:00
sf, err := parseStatsFunc(lex)
2024-04-29 01:44:54 +00:00
if err != nil {
return nil, err
}
2024-05-17 02:11:10 +00:00
f.f = sf
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
2024-05-30 12:26:05 +00:00
return nil, fmt.Errorf("cannot parse 'if' filter for [%s]: %w", sf, err)
2024-05-15 22:49:11 +00:00
}
2024-05-17 02:11:10 +00:00
f.iff = iff
}
2024-05-30 12:26:05 +00:00
resultName := ""
if lex.isKeyword(",", "|", ")", "") {
resultName = sf.String()
} else {
if lex.isKeyword("as") {
lex.nextToken()
}
fieldName, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for [%s]: %w", sf, err)
}
resultName = fieldName
}
if bf := seenByFields[resultName]; bf != nil {
return nil, fmt.Errorf("the %q is used as 'by' field [%s], so it cannot be used as result name for [%s]", resultName, bf, sf)
2024-05-15 22:41:13 +00:00
}
2024-05-30 12:26:05 +00:00
if sfPrev := seenResultNames[resultName]; sfPrev != nil {
return nil, fmt.Errorf("cannot use identical result name %q for [%s] and [%s]", resultName, sfPrev, sf)
}
seenResultNames[resultName] = sf
2024-05-17 02:11:10 +00:00
f.resultName = resultName
2024-05-15 22:41:13 +00:00
2024-05-17 02:11:10 +00:00
funcs = append(funcs, f)
2024-05-15 22:49:11 +00:00
2024-04-29 01:44:54 +00:00
if lex.isKeyword("|", ")", "") {
ps.funcs = funcs
return &ps, nil
}
if !lex.isKeyword(",") {
2024-05-30 12:26:05 +00:00
return nil, fmt.Errorf("unexpected token %q after [%s]; want ',', '|' or ')'", sf, lex.token)
2024-04-29 01:44:54 +00:00
}
lex.nextToken()
}
}
2024-05-15 22:41:13 +00:00
func parseStatsFunc(lex *lexer) (statsFunc, error) {
2024-04-29 01:44:54 +00:00
switch {
2024-05-21 16:56:35 +00:00
case lex.isKeyword("avg"):
sas, err := parseStatsAvg(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'avg' func: %w", err)
}
return sas, nil
2024-04-29 01:44:54 +00:00
case lex.isKeyword("count"):
2024-05-03 10:54:37 +00:00
scs, err := parseStatsCount(lex)
2024-04-29 01:44:54 +00:00
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'count' func: %w", err)
2024-04-29 01:44:54 +00:00
}
2024-05-15 22:41:13 +00:00
return scs, nil
2024-05-05 01:48:29 +00:00
case lex.isKeyword("count_empty"):
scs, err := parseStatsCountEmpty(lex)
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'count_empty' func: %w", err)
2024-05-05 01:48:29 +00:00
}
2024-05-15 22:41:13 +00:00
return scs, nil
2024-05-07 21:47:30 +00:00
case lex.isKeyword("count_uniq"):
sus, err := parseStatsCountUniq(lex)
2024-04-29 01:44:54 +00:00
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'count_uniq' func: %w", err)
2024-04-29 01:44:54 +00:00
}
2024-05-15 22:41:13 +00:00
return sus, nil
2024-04-30 23:58:35 +00:00
case lex.isKeyword("max"):
sms, err := parseStatsMax(lex)
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'max' func: %w", err)
2024-04-30 23:58:35 +00:00
}
2024-05-15 22:41:13 +00:00
return sms, nil
2024-05-21 16:56:35 +00:00
case lex.isKeyword("median"):
sms, err := parseStatsMedian(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'median' func: %w", err)
}
return sms, nil
2024-05-01 00:08:37 +00:00
case lex.isKeyword("min"):
sms, err := parseStatsMin(lex)
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'min' func: %w", err)
2024-05-01 00:08:37 +00:00
}
2024-05-15 22:41:13 +00:00
return sms, nil
2024-05-21 16:56:35 +00:00
case lex.isKeyword("quantile"):
sqs, err := parseStatsQuantile(lex)
2024-05-03 10:54:37 +00:00
if err != nil {
2024-05-21 16:56:35 +00:00
return nil, fmt.Errorf("cannot parse 'quantile' func: %w", err)
2024-05-03 10:54:37 +00:00
}
2024-05-21 16:56:35 +00:00
return sqs, nil
2024-05-30 14:03:00 +00:00
case lex.isKeyword("row_any"):
sas, err := parseStatsRowAny(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_any' func: %w", err)
}
return sas, nil
case lex.isKeyword("row_max"):
sms, err := parseStatsRowMax(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_max' func: %w", err)
}
return sms, nil
case lex.isKeyword("row_min"):
sms, err := parseStatsRowMin(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'row_min' func: %w", err)
}
return sms, nil
2024-05-21 16:56:35 +00:00
case lex.isKeyword("sum"):
sss, err := parseStatsSum(lex)
2024-05-11 03:28:36 +00:00
if err != nil {
2024-05-21 16:56:35 +00:00
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
2024-05-11 03:28:36 +00:00
}
2024-05-21 16:56:35 +00:00
return sss, nil
2024-05-14 17:35:08 +00:00
case lex.isKeyword("sum_len"):
sss, err := parseStatsSumLen(lex)
if err != nil {
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("cannot parse 'sum_len' func: %w", err)
2024-05-14 17:35:08 +00:00
}
2024-05-15 22:41:13 +00:00
return sss, nil
2024-05-21 16:56:35 +00:00
case lex.isKeyword("uniq_values"):
sus, err := parseStatsUniqValues(lex)
2024-05-14 20:11:51 +00:00
if err != nil {
2024-05-21 16:56:35 +00:00
return nil, fmt.Errorf("cannot parse 'uniq_values' func: %w", err)
2024-05-14 20:11:51 +00:00
}
2024-05-21 16:56:35 +00:00
return sus, nil
case lex.isKeyword("values"):
svs, err := parseStatsValues(lex)
2024-05-14 20:31:21 +00:00
if err != nil {
2024-05-21 16:56:35 +00:00
return nil, fmt.Errorf("cannot parse 'values' func: %w", err)
2024-05-14 20:31:21 +00:00
}
2024-05-21 16:56:35 +00:00
return svs, nil
2024-04-29 01:44:54 +00:00
default:
2024-05-15 22:41:13 +00:00
return nil, fmt.Errorf("unknown stats func %q", lex.token)
2024-04-29 01:44:54 +00:00
}
}
2024-05-11 02:43:42 +00:00
var zeroByStatsField = &byStatsField{}
2024-05-05 10:48:54 +00:00
// byStatsField represents 'by (...)' part of the pipeStats.
2024-05-03 09:15:09 +00:00
//
2024-05-05 10:48:54 +00:00
// It can have either 'name' representation or 'name:bucket' or 'name:buket offset off' representation,
// where `bucket` and `off` can contain duration, size or numeric value for creating different buckets
2024-05-03 09:15:09 +00:00
// for 'value/bucket'.
2024-05-05 10:48:54 +00:00
type byStatsField struct {
2024-05-03 09:15:09 +00:00
name string
// bucketSizeStr is string representation of the bucket size
bucketSizeStr string
2024-05-03 10:10:45 +00:00
// bucketSize is the bucket for grouping the given field values with value/bucketSize calculations
2024-05-03 09:15:09 +00:00
bucketSize float64
2024-05-03 10:10:45 +00:00
// bucketOffsetStr is string representation of the offset for bucketSize
bucketOffsetStr string
// bucketOffset is the offset for bucketSize
bucketOffset float64
2024-05-03 09:15:09 +00:00
}
2024-05-05 10:48:54 +00:00
func (bf *byStatsField) String() string {
2024-05-03 09:15:09 +00:00
s := quoteTokenIfNeeded(bf.name)
if bf.bucketSizeStr != "" {
s += ":" + bf.bucketSizeStr
2024-05-03 10:10:45 +00:00
if bf.bucketOffsetStr != "" {
s += " offset " + bf.bucketOffsetStr
}
2024-05-03 09:15:09 +00:00
}
return s
}
2024-05-11 02:43:42 +00:00
func (bf *byStatsField) hasBucketConfig() bool {
return len(bf.bucketSizeStr) > 0 || len(bf.bucketOffsetStr) > 0
}
2024-05-05 10:48:54 +00:00
func parseByStatsFields(lex *lexer) ([]*byStatsField, error) {
2024-05-03 09:15:09 +00:00
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
2024-05-05 10:48:54 +00:00
var bfs []*byStatsField
2024-05-03 09:15:09 +00:00
for {
2024-05-04 22:28:01 +00:00
lex.nextToken()
2024-05-03 09:15:09 +00:00
if lex.isKeyword(")") {
lex.nextToken()
return bfs, nil
}
2024-05-18 20:09:52 +00:00
fieldName, err := getCompoundPhrase(lex, false)
2024-05-03 09:15:09 +00:00
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
2024-05-18 20:09:52 +00:00
fieldName = getCanonicalColumnName(fieldName)
2024-05-05 10:48:54 +00:00
bf := &byStatsField{
2024-05-03 09:15:09 +00:00
name: fieldName,
}
if lex.isKeyword(":") {
2024-05-03 10:10:45 +00:00
// Parse bucket size
2024-05-03 09:15:09 +00:00
lex.nextToken()
bucketSizeStr := lex.token
lex.nextToken()
if bucketSizeStr == "/" {
bucketSizeStr += lex.token
lex.nextToken()
}
2024-05-11 02:43:42 +00:00
if bucketSizeStr != "year" && bucketSizeStr != "month" {
bucketSize, ok := tryParseBucketSize(bucketSizeStr)
if !ok {
return nil, fmt.Errorf("cannot parse bucket size for field %q: %q", fieldName, bucketSizeStr)
}
bf.bucketSize = bucketSize
2024-05-03 09:15:09 +00:00
}
bf.bucketSizeStr = bucketSizeStr
2024-05-03 10:10:45 +00:00
// Parse bucket offset
if lex.isKeyword("offset") {
lex.nextToken()
bucketOffsetStr := lex.token
lex.nextToken()
if bucketOffsetStr == "-" {
bucketOffsetStr += lex.token
lex.nextToken()
}
bucketOffset, ok := tryParseBucketOffset(bucketOffsetStr)
if !ok {
return nil, fmt.Errorf("cannot parse bucket offset for field %q: %q", fieldName, bucketOffsetStr)
}
bf.bucketOffsetStr = bucketOffsetStr
bf.bucketOffset = bucketOffset
}
2024-05-03 09:15:09 +00:00
}
bfs = append(bfs, bf)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return bfs, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
2024-05-03 10:10:45 +00:00
// tryParseBucketOffset tries parsing bucket offset, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
// - duration: 1.5s - it is converted to nanoseconds
// - bytes: 1.5KiB
func tryParseBucketOffset(s string) (float64, bool) {
// Try parsing s as floating point number
if f, ok := tryParseFloat64(s); ok {
return f, true
}
// Try parsing s as duration (1s, 5m, etc.)
if nsecs, ok := tryParseDuration(s); ok {
return float64(nsecs), true
}
// Try parsing s as bytes (KiB, MB, etc.)
if n, ok := tryParseBytes(s); ok {
return float64(n), true
}
return 0, false
}
2024-05-03 09:15:09 +00:00
// tryParseBucketSize tries parsing bucket size, which can have the following formats:
//
// - integer number: 12345
// - floating-point number: 1.2345
2024-05-03 10:10:45 +00:00
// - duration: 1.5s - it is converted to nanoseconds
2024-05-03 09:15:09 +00:00
// - bytes: 1.5KiB
// - ipv4 mask: /24
func tryParseBucketSize(s string) (float64, bool) {
2024-05-11 02:43:42 +00:00
switch s {
case "nanosecond":
return 1, true
case "microsecond":
return nsecsPerMicrosecond, true
case "millisecond":
return nsecsPerMillisecond, true
case "second":
return nsecsPerSecond, true
case "minute":
return nsecsPerMinute, true
case "hour":
return nsecsPerHour, true
case "day":
return nsecsPerDay, true
case "week":
return nsecsPerWeek, true
}
2024-05-03 09:15:09 +00:00
// Try parsing s as floating point number
if f, ok := tryParseFloat64(s); ok {
return f, true
}
// Try parsing s as duration (1s, 5m, etc.)
if nsecs, ok := tryParseDuration(s); ok {
return float64(nsecs), true
}
// Try parsing s as bytes (KiB, MB, etc.)
if n, ok := tryParseBytes(s); ok {
return float64(n), true
}
if n, ok := tryParseIPv4Mask(s); ok {
return float64(n), true
}
return 0, false
}
2024-04-29 01:44:54 +00:00
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing `(`")
}
var fields []string
for {
2024-05-04 22:28:01 +00:00
lex.nextToken()
2024-04-29 01:44:54 +00:00
if lex.isKeyword(")") {
lex.nextToken()
return fields, nil
}
if lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected `,`")
}
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse field name: %w", err)
}
fields = append(fields, field)
switch {
case lex.isKeyword(")"):
lex.nextToken()
return fields, nil
case lex.isKeyword(","):
default:
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
}
}
}
func parseFieldName(lex *lexer) (string, error) {
2024-05-18 20:09:52 +00:00
fieldName, err := getCompoundToken(lex)
if err != nil {
return "", fmt.Errorf("cannot parse field name: %w", err)
2024-04-29 01:44:54 +00:00
}
2024-05-04 22:28:01 +00:00
fieldName = getCanonicalColumnName(fieldName)
return fieldName, nil
2024-04-29 01:44:54 +00:00
}
func fieldNamesString(fields []string) string {
a := make([]string, len(fields))
for i, f := range fields {
if f != "*" {
f = quoteTokenIfNeeded(f)
}
a[i] = f
}
return strings.Join(a, ", ")
}
2024-05-03 09:15:09 +00:00
func areConstValues(values []string) bool {
if len(values) == 0 {
return false
}
v := values[0]
for i := 1; i < len(values); i++ {
if v != values[i] {
return false
}
}
return true
}