diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 87c28c8a8..5f5f1a143 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,7 +206,7 @@ func (q *Query) getResultColumnNames() []string { switch t := p.(type) { case *pipeFields: return t.fields - case *statsPipe: + case *pipeStats: return t.neededFields() } } diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index f85d22ff4..91d9c4486 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -83,11 +83,11 @@ func parsePipes(lex *lexer) ([]pipe, error) { } pipes = append(pipes, pf) case lex.isKeyword("stats"): - sp, err := parseStatsPipe(lex) + ps, err := parseStatsPipe(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } - pipes = append(pipes, sp) + pipes = append(pipes, ps) case lex.isKeyword("head"): hp, err := parseHeadPipe(lex) if err != nil { @@ -95,11 +95,11 @@ func parsePipes(lex *lexer) ([]pipe, error) { } pipes = append(pipes, hp) case lex.isKeyword("skip"): - sp, err := parseSkipPipe(lex) + ps, err := parseSkipPipe(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err) } - pipes = append(pipes, sp) + pipes = append(pipes, ps) default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } @@ -188,7 +188,7 @@ func parsePipeFields(lex *lexer) (*pipeFields, error) { } } -type statsPipe struct { +type pipeStats struct { byFields []string funcs []statsFunc } @@ -228,17 +228,17 @@ type statsProcessor interface { finalizeStats() (name, value string) } -func (sp *statsPipe) String() string { +func (ps *pipeStats) String() string { s := "stats " - if len(sp.byFields) > 0 { - s += "by (" + fieldNamesString(sp.byFields) + ") " + if len(ps.byFields) > 0 { + s += "by (" + fieldNamesString(ps.byFields) + ") " } - if len(sp.funcs) == 0 { - logger.Panicf("BUG: statsPipe must contain at least a single statsFunc") + if len(ps.funcs) == 0 { + logger.Panicf("BUG: pipeStats must contain at least a single statsFunc") } - a := make([]string, len(sp.funcs)) - for i, f := range sp.funcs { + a := make([]string, len(ps.funcs)) + for i, f := range ps.funcs { a[i] = f.String() } s += strings.Join(a, ", ") @@ -247,20 +247,20 @@ func (sp *statsPipe) String() string { const stateSizeBudgetChunk = 1 << 20 -func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.3) - shards := make([]statsPipeProcessorShard, workersCount) + shards := make([]pipeStatsProcessorShard, workersCount) for i := range shards { shard := &shards[i] - shard.sp = sp - shard.m = make(map[string]*statsPipeGroup) + shard.ps = ps + shard.m = make(map[string]*pipeStatsGroup) shard.stateSizeBudget = stateSizeBudgetChunk maxStateSize -= stateSizeBudgetChunk } - spp := &statsPipeProcessor{ - sp: sp, + spp := &pipeStatsProcessor{ + ps: ps, stopCh: stopCh, cancel: cancel, ppBase: ppBase, @@ -274,28 +274,28 @@ func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, return spp } -type statsPipeProcessor struct { - sp *statsPipe +type pipeStatsProcessor struct { + ps *pipeStats stopCh <-chan struct{} cancel func() ppBase pipeProcessor - shards []statsPipeProcessorShard + shards []pipeStatsProcessorShard maxStateSize int64 stateSizeBudget atomic.Int64 } -type statsPipeProcessorShard struct { - statsPipeProcessorShardNopad +type pipeStatsProcessorShard struct { + pipeStatsProcessorShardNopad // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(statsPipeProcessorShardNopad{})%128]byte + _ [128 - unsafe.Sizeof(pipeStatsProcessorShardNopad{})%128]byte } -type statsPipeProcessorShardNopad struct { - sp *statsPipe - m map[string]*statsPipeGroup +type pipeStatsProcessorShardNopad struct { + ps *pipeStats + m map[string]*pipeStatsGroup columnValues [][]string keyBuf []byte @@ -303,16 +303,16 @@ type statsPipeProcessorShardNopad struct { stateSizeBudget int } -func (shard *statsPipeProcessorShard) getStatsProcessors(key []byte) []statsProcessor { +func (shard *pipeStatsProcessorShard) getStatsProcessors(key []byte) []statsProcessor { spg := shard.m[string(key)] if spg == nil { - sfps := make([]statsProcessor, len(shard.sp.funcs)) - for i, f := range shard.sp.funcs { + sfps := make([]statsProcessor, len(shard.ps.funcs)) + for i, f := range shard.ps.funcs { sfp, stateSize := f.newStatsProcessor() sfps[i] = sfp shard.stateSizeBudget -= stateSize } - spg = &statsPipeGroup{ + spg = &pipeStatsGroup{ sfps: sfps, } shard.m[string(key)] = spg @@ -321,11 +321,11 @@ func (shard *statsPipeProcessorShard) getStatsProcessors(key []byte) []statsProc return spg.sfps } -type statsPipeGroup struct { +type pipeStatsGroup struct { sfps []statsProcessor } -func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { +func (spp *pipeStatsProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { shard := &spp.shards[workerID] for shard.stateSizeBudget < 0 { @@ -342,7 +342,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col shard.stateSizeBudget += stateSizeBudgetChunk } - byFields := spp.sp.byFields + byFields := spp.ps.byFields if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. for _, sfp := range shard.getStatsProcessors(nil) { @@ -379,7 +379,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields, len(timestamps)) + shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, byFields, len(timestamps)) columnValues := shard.columnValues if areConstValues(columnValues) { @@ -445,9 +445,9 @@ func isConstValue(values []string) bool { return true } -func (spp *statsPipeProcessor) flush() error { +func (spp *pipeStatsProcessor) flush() error { if n := spp.stateSizeBudget.Load(); n <= 0 { - return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20)) + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.ps.String(), spp.maxStateSize/(1<<20)) } // Merge states across shards @@ -477,7 +477,7 @@ func (spp *statsPipeProcessor) flush() error { } // Write per-group states to ppBase - byFields := spp.sp.byFields + byFields := spp.ps.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. _ = shards[0].getStatsProcessors(nil) @@ -533,7 +533,7 @@ func (spp *statsPipeProcessor) flush() error { return nil } -func (sp *statsPipe) neededFields() []string { +func (ps *pipeStats) neededFields() []string { var neededFields []string m := make(map[string]struct{}) updateNeededFields := func(fields []string) { @@ -545,9 +545,9 @@ func (sp *statsPipe) neededFields() []string { } } - updateNeededFields(sp.byFields) + updateNeededFields(ps.byFields) - for _, f := range sp.funcs { + for _, f := range ps.funcs { fields := f.neededFields() updateNeededFields(fields) } @@ -555,19 +555,19 @@ func (sp *statsPipe) neededFields() []string { return neededFields } -func parseStatsPipe(lex *lexer) (*statsPipe, error) { +func parseStatsPipe(lex *lexer) (*pipeStats, error) { if !lex.mustNextToken() { return nil, fmt.Errorf("missing stats config") } - var sp statsPipe + var ps pipeStats if lex.isKeyword("by") { lex.nextToken() fields, err := parseFieldNamesInParens(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'by': %w", err) } - sp.byFields = fields + ps.byFields = fields } var funcs []statsFunc @@ -578,8 +578,8 @@ func parseStatsPipe(lex *lexer) (*statsPipe, error) { } funcs = append(funcs, sf) if lex.isKeyword("|", ")", "") { - sp.funcs = funcs - return &sp, nil + ps.funcs = funcs + return &ps, nil } if !lex.isKeyword(",") { return nil, fmt.Errorf("unexpected token %q; want ',', '|' or ')'", lex.token) @@ -707,19 +707,19 @@ type skipPipe struct { n uint64 } -func (sp *skipPipe) String() string { - return fmt.Sprintf("skip %d", sp.n) +func (ps *skipPipe) String() string { + return fmt.Sprintf("skip %d", ps.n) } -func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (ps *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &skipPipeProcessor{ - sp: sp, + ps: ps, ppBase: ppBase, } } type skipPipeProcessor struct { - sp *skipPipe + ps *skipPipe ppBase pipeProcessor rowsProcessed atomic.Uint64 @@ -727,17 +727,17 @@ type skipPipeProcessor struct { func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps))) - if rowsProcessed <= spp.sp.n { + if rowsProcessed <= spp.ps.n { return } rowsProcessed -= uint64(len(timestamps)) - if rowsProcessed >= spp.sp.n { + if rowsProcessed >= spp.ps.n { spp.ppBase.writeBlock(workerID, timestamps, columns) return } - rowsRemaining := spp.sp.n - rowsProcessed + rowsRemaining := spp.ps.n - rowsProcessed cs := make([]BlockColumn, len(columns)) for i, c := range columns { cDst := &cs[i] @@ -761,10 +761,10 @@ func parseSkipPipe(lex *lexer) (*skipPipe, error) { return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err) } lex.nextToken() - sp := &skipPipe{ + ps := &skipPipe{ n: n, } - return sp, nil + return ps, nil } func parseFieldNamesInParens(lex *lexer) ([]string, error) {