From 5d791d617bf56a229b708ead8d5611aaa1b0c484 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Apr 2024 03:27:46 +0200 Subject: [PATCH] wip --- lib/logstorage/parser.go | 2 +- lib/logstorage/pipes.go | 40 ++++++++++++++++++++-------------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index bac0288a32..87c28c8a8e 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -204,7 +204,7 @@ func (q *Query) String() string { func (q *Query) getResultColumnNames() []string { for _, p := range q.pipes { switch t := p.(type) { - case *fieldsPipe: + case *pipeFields: return t.fields case *statsPipe: return t.neededFields() diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 212a398460..f85d22ff46 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -77,11 +77,11 @@ func parsePipes(lex *lexer) ([]pipe, error) { } switch { case lex.isKeyword("fields"): - fp, err := parseFieldsPipe(lex) + pf, err := parsePipeFields(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) } - pipes = append(pipes, fp) + pipes = append(pipes, pf) case lex.isKeyword("stats"): sp, err := parseStatsPipe(lex) if err != nil { @@ -107,7 +107,7 @@ func parsePipes(lex *lexer) ([]pipe, error) { return pipes, nil } -type fieldsPipe struct { +type pipeFields struct { // fields contains list of fields to fetch fields []string @@ -115,36 +115,36 @@ type fieldsPipe struct { containsStar bool } -func (fp *fieldsPipe) String() string { - if len(fp.fields) == 0 { - logger.Panicf("BUG: fieldsPipe must contain at least a single field") +func (pf *pipeFields) String() string { + if len(pf.fields) == 0 { + logger.Panicf("BUG: pipeFields must contain at least a single field") } - return "fields " + fieldNamesString(fp.fields) + return "fields " + fieldNamesString(pf.fields) } -func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { - return &fieldsPipeProcessor{ - fp: fp, +func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + return &pipeFieldsProcessor{ + pf: pf, ppBase: ppBase, } } -type fieldsPipeProcessor struct { - fp *fieldsPipe +type pipeFieldsProcessor struct { + pf *pipeFields ppBase pipeProcessor } -func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - if fpp.fp.containsStar || areSameBlockColumns(columns, fpp.fp.fields) { +func (fpp *pipeFieldsProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + if fpp.pf.containsStar || areSameBlockColumns(columns, fpp.pf.fields) { // Fast path - there is no need in additional transformations before writing the block to ppBase. fpp.ppBase.writeBlock(workerID, timestamps, columns) return } - // Slow path - construct columns for fpp.fp.fields before writing them to ppBase. + // Slow path - construct columns for fpp.pf.fields before writing them to ppBase. brs := getBlockRows() cs := brs.cs - for _, f := range fpp.fp.fields { + for _, f := range fpp.pf.fields { values := getBlockColumnValues(columns, f, len(timestamps)) cs = append(cs, BlockColumn{ Name: f, @@ -156,11 +156,11 @@ func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, co putBlockRows(brs) } -func (fpp *fieldsPipeProcessor) flush() error { +func (fpp *pipeFieldsProcessor) flush() error { return nil } -func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { +func parsePipeFields(lex *lexer) (*pipeFields, error) { var fields []string for { if !lex.mustNextToken() { @@ -176,11 +176,11 @@ func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { fields = append(fields, field) switch { case lex.isKeyword("|", ")", ""): - fp := &fieldsPipe{ + pf := &pipeFields{ fields: fields, containsStar: slices.Contains(fields, "*"), } - return fp, nil + return pf, nil case lex.isKeyword(","): default: return nil, fmt.Errorf("unexpected token: %q; expecting ',', '|' or ')'", lex.token)