diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 55825dae7..210aae97b 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -247,7 +247,7 @@ func (q *Query) AddTimeFilter(start, end int64) { // See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe func (q *Query) AddPipeLimit(n uint64) { q.pipes = append(q.pipes, &pipeLimit{ - n: n, + limit: n, }) } @@ -255,6 +255,7 @@ func (q *Query) AddPipeLimit(n uint64) { func (q *Query) Optimize() { q.pipes = optimizeSortOffsetPipes(q.pipes) q.pipes = optimizeSortLimitPipes(q.pipes) + q.pipes = optimizeUniqLimitPipes(q.pipes) q.pipes = optimizeFilterPipes(q.pipes) // Merge `q | filter ...` into q. @@ -266,6 +267,14 @@ func (q *Query) Optimize() { } } + // Optimize `q | field_names ...` by marking pipeFieldNames as first pipe + if len(q.pipes) > 0 { + pf, ok := q.pipes[0].(*pipeFieldNames) + if ok { + pf.isFirstPipe = true + } + } + // Optimize 'in(query)' filters optimizeFilterIn(q.f) for _, p := range q.pipes { @@ -306,7 +315,7 @@ func optimizeSortOffsetPipes(pipes []pipe) []pipe { continue } if ps.offset == 0 && ps.limit == 0 { - ps.offset = po.n + ps.offset = po.offset } pipes = append(pipes[:i], pipes[i+1:]...) } @@ -327,8 +336,30 @@ func optimizeSortLimitPipes(pipes []pipe) []pipe { i++ continue } - if ps.limit == 0 || pl.n < ps.limit { - ps.limit = pl.n + if ps.limit == 0 || pl.limit < ps.limit { + ps.limit = pl.limit + } + pipes = append(pipes[:i], pipes[i+1:]...) + } + return pipes +} + +func optimizeUniqLimitPipes(pipes []pipe) []pipe { + // Merge 'uniq ... | limit ...' into 'uniq ... limit ...' + i := 1 + for i < len(pipes) { + pl, ok := pipes[i].(*pipeLimit) + if !ok { + i++ + continue + } + pu, ok := pipes[i-1].(*pipeUniq) + if !ok { + i++ + continue + } + if pu.limit == 0 || pl.limit < pu.limit { + pu.limit = pl.limit } pipes = append(pipes[:i], pipes[i+1:]...) } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 17f93c843..4d7fbeaf1 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -811,6 +811,10 @@ func TestParseQuerySuccess(t *testing.T) { // multiple fields pipes f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`) + // field_names pipe + f(`foo | field_names as x`, `foo | field_names as x`) + f(`foo | field_names y`, `foo | field_names as y`) + // copy and cp pipe f(`* | copy foo as bar`, `* | copy foo as bar`) f(`* | cp foo bar`, `* | copy foo as bar`) @@ -1210,6 +1214,13 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | fields bar,`) f(`foo | fields bar,,`) + // invalid field_names + f(`foo | field_names ()`) + f(`foo | field_names (x)`) + f(`foo | field_names (x,y)`) + f(`foo | field_names x y`) + f(`foo | field_names x, y`) + // invalid copy and cp pipe f(`foo | copy`) f(`foo | cp`) @@ -1375,8 +1386,9 @@ func TestQueryGetNeededColumns(t *testing.T) { q, err := ParseQuery(s) if err != nil { - t.Fatalf("cannot parse query %s: %s", s, err) + t.Fatalf("cannot parse query [%s]: %s", s, err) } + q.Optimize() needed, unneeded := q.getNeededColumns() neededColumns := strings.Join(needed, ",") @@ -1486,6 +1498,27 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | uniq by (f1,f2) | rm f1,f3`, `f1,f2`, ``) f(`* | uniq by (f1,f2) | fields f3`, `f1,f2`, ``) + f(`* | filter foo f1:bar`, `*`, ``) + f(`* | filter foo f1:bar | fields f2`, `f2`, ``) + f(`* | limit 10 | filter foo f1:bar | fields f2`, `_msg,f1,f2`, ``) + f(`* | filter foo f1:bar | fields f1`, `f1`, ``) + f(`* | filter foo f1:bar | rm f1`, `*`, `f1`) + f(`* | limit 10 | filter foo f1:bar | rm f1`, `*`, ``) + f(`* | filter foo f1:bar | rm f2`, `*`, `f2`) + f(`* | limit 10 | filter foo f1:bar | rm f2`, `*`, `f2`) + f(`* | fields x | filter foo f1:bar | rm f2`, `x`, ``) + f(`* | fields x,f1 | filter foo f1:bar | rm f2`, `f1,x`, ``) + f(`* | rm x,f1 | filter foo f1:bar`, `*`, `f1,x`) + + f(`* | field_names as foo`, `*`, `_time`) + f(`* | field_names foo | fields bar`, `*`, `_time`) + f(`* | field_names foo | fields foo`, `*`, `_time`) + f(`* | field_names foo | rm foo`, `*`, `_time`) + f(`* | field_names foo | rm bar`, `*`, `_time`) + f(`* | field_names foo | rm _time`, `*`, `_time`) + f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) + f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) + f(`* | rm f1, f2`, `*`, `f1,f2`) f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`) f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 200f48d36..36adb7b1e 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) } pipes = append(pipes, pd) + case lex.isKeyword("field_names"): + pf, err := parsePipeFieldNames(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) + } + pipes = append(pipes, pf) case lex.isKeyword("fields"): pf, err := parsePipeFields(lex) if err != nil { diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go new file mode 100644 index 000000000..38a1e265a --- /dev/null +++ b/lib/logstorage/pipe_field_names.go @@ -0,0 +1,161 @@ +package logstorage + +import ( + "fmt" + "strings" + "unsafe" +) + +// pipeFieldNames processes '| field_names' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe +type pipeFieldNames struct { + // resultName is the name of the column to write results to. + resultName string + + // isFirstPipe is set to true if '| field_names' pipe is the first in the query. + // + // This allows skipping loading of _time column. + isFirstPipe bool +} + +func (pf *pipeFieldNames) String() string { + return "field_names as " + quoteTokenIfNeeded(pf.resultName) +} + +func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) { + neededFields.add("*") + unneededFields.reset() + + if pf.isFirstPipe { + unneededFields.add("_time") + } +} + +func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]pipeFieldNamesProcessorShard, workersCount) + for i := range shards { + shards[i] = pipeFieldNamesProcessorShard{ + pipeFieldNamesProcessorShardNopad: pipeFieldNamesProcessorShardNopad{ + m: make(map[string]struct{}), + }, + } + } + + pfp := &pipeFieldNamesProcessor{ + pf: pf, + stopCh: stopCh, + ppBase: ppBase, + + shards: shards, + } + return pfp +} + +type pipeFieldNamesProcessor struct { + pf *pipeFieldNames + stopCh <-chan struct{} + ppBase pipeProcessor + + shards []pipeFieldNamesProcessorShard +} + +type pipeFieldNamesProcessorShard struct { + pipeFieldNamesProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeFieldNamesProcessorShardNopad{})%128]byte +} + +type pipeFieldNamesProcessorShardNopad struct { + // m holds unique field names. + m map[string]struct{} +} + +func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pfp.shards[workerID] + cs := br.getColumns() + for _, c := range cs { + if _, ok := shard.m[c.name]; !ok { + nameCopy := strings.Clone(c.name) + shard.m[nameCopy] = struct{}{} + } + } +} + +func (pfp *pipeFieldNamesProcessor) flush() error { + if needStop(pfp.stopCh) { + return nil + } + + // merge state across shards + shards := pfp.shards + m := shards[0].m + shards = shards[1:] + for i := range shards { + for k := range shards[i].m { + m[k] = struct{}{} + } + } + + // write result + wctx := &pipeFieldNamesWriteContext{ + pfp: pfp, + } + wctx.rcs[0].name = pfp.pf.resultName + for k := range m { + wctx.writeRow(k) + } + wctx.flush() + + return nil +} + +type pipeFieldNamesWriteContext struct { + pfp *pipeFieldNamesProcessor + rcs [1]resultColumn + br blockResult + + valuesLen int +} + +func (wctx *pipeFieldNamesWriteContext) writeRow(v string) { + wctx.rcs[0].addValue(v) + wctx.valuesLen += len(v) + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeFieldNamesWriteContext) flush() { + br := &wctx.br + + wctx.valuesLen = 0 + + // Flush rcs to ppBase + br.setResultColumns(wctx.rcs[:1]) + wctx.pfp.ppBase.writeBlock(0, br) + br.reset() + wctx.rcs[0].resetKeepName() +} + +func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) { + if !lex.isKeyword("field_names") { + return nil, fmt.Errorf("expecting 'field_names'; got %q", lex.token) + } + lex.nextToken() + + if lex.isKeyword("as") { + lex.nextToken() + } + resultName := getCompoundPhrase(lex, false) + + pf := &pipeFieldNames{ + resultName: resultName, + } + return pf, nil +} diff --git a/lib/logstorage/pipe_field_names_test.go b/lib/logstorage/pipe_field_names_test.go new file mode 100644 index 000000000..df8d68c48 --- /dev/null +++ b/lib/logstorage/pipe_field_names_test.go @@ -0,0 +1,38 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeFieldNamesUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + p, err := parsePipeFieldNames(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("field_names as f1", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("field_names as f3", "*", "f1,f2", "*", "") + + // all the needed fields, unneeded fields intersect with src + f("field_names as f1", "*", "s1,f1,f2", "*", "") + + // needed fields do not intersect with src + f("field_names as f3", "f1,f2", "", "*", "") + + // needed fields intersect with src + f("field_names as f1", "s1,f1,f2", "", "*", "") +} diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index a3070754d..a8f8787b6 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -17,19 +17,20 @@ func (pf *pipeFilter) String() string { return "filter " + pf.f.String() } -func (pf *pipeFilter) updateNeededFields(neededFields, _ fieldsSet) { - pf.f.updateNeededFields(neededFields) +func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + fs := newFieldsSet() + pf.f.updateNeededFields(fs) + for f := range fs { + unneededFields.remove(f) + } + } else { + pf.f.updateNeededFields(neededFields) + } } func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeFilterProcessorShard, workersCount) - for i := range shards { - shards[i] = pipeFilterProcessorShard{ - pipeFilterProcessorShardNopad: pipeFilterProcessorShardNopad{ - pf: pf, - }, - } - } pfp := &pipeFilterProcessor{ pf: pf, @@ -55,8 +56,6 @@ type pipeFilterProcessorShard struct { } type pipeFilterProcessorShardNopad struct { - pf *pipeFilter - br blockResult bm bitmap } @@ -71,7 +70,7 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { bm := &shard.bm bm.init(len(br.timestamps)) bm.setBits() - shard.pf.f.applyToBlockResult(br, bm) + pfp.pf.f.applyToBlockResult(br, bm) if bm.areAllBitsSet() { // Fast path - the filter didn't filter out anything - send br to the base pipe as is. pfp.ppBase.writeBlock(workerID, br) diff --git a/lib/logstorage/pipe_filter_test.go b/lib/logstorage/pipe_filter_test.go new file mode 100644 index 000000000..d4d5e3ff9 --- /dev/null +++ b/lib/logstorage/pipe_filter_test.go @@ -0,0 +1,38 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeFilterUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + p, err := parsePipeFilter(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("filter foo f1:bar", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("filter foo f3:bar", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("filter foo f1:bar", "*", "s1,f1,f2", "*", "s1,f2") + + // needed fields do not intersect with src + f("filter foo f3:bar", "f1,f2", "", "_msg,f1,f2,f3", "") + + // needed fields intersect with src + f("filter foo f1:bar", "s1,f1,f2", "", "_msg,f1,f2,s1", "") +} diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 91793b9fb..15e0f4f26 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -9,18 +9,18 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe type pipeLimit struct { - n uint64 + limit uint64 } func (pl *pipeLimit) String() string { - return fmt.Sprintf("limit %d", pl.n) + return fmt.Sprintf("limit %d", pl.limit) } func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) { } func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { - if pl.n == 0 { + if pl.limit == 0 { // Special case - notify the caller to stop writing data to the returned pipeLimitProcessor cancel() } @@ -45,7 +45,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { } rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= plp.pl.n { + if rowsProcessed <= plp.pl.limit { // Fast path - write all the rows to ppBase. plp.ppBase.writeBlock(workerID, br) return @@ -53,13 +53,13 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { // Slow path - overflow. Write the remaining rows if needed. rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= plp.pl.n { + if rowsProcessed >= plp.pl.limit { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - keepRows := plp.pl.n - rowsProcessed + keepRows := plp.pl.limit - rowsProcessed br.truncateRows(int(keepRows)) plp.ppBase.writeBlock(workerID, br) @@ -83,7 +83,7 @@ func parsePipeLimit(lex *lexer) (*pipeLimit, error) { } lex.nextToken() pl := &pipeLimit{ - n: n, + limit: n, } return pl, nil } diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index 61219989a..5518d80f8 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -9,11 +9,11 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe type pipeOffset struct { - n uint64 + offset uint64 } func (po *pipeOffset) String() string { - return fmt.Sprintf("offset %d", po.n) + return fmt.Sprintf("offset %d", po.offset) } func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) { @@ -39,17 +39,17 @@ func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { } rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= pop.po.n { + if rowsProcessed <= pop.po.offset { return } rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= pop.po.n { + if rowsProcessed >= pop.po.offset { pop.ppBase.writeBlock(workerID, br) return } - rowsSkip := pop.po.n - rowsProcessed + rowsSkip := pop.po.offset - rowsProcessed br.skipRows(int(rowsSkip)) pop.ppBase.writeBlock(workerID, br) } @@ -70,7 +70,7 @@ func parsePipeOffset(lex *lexer) (*pipeOffset, error) { } lex.nextToken() po := &pipeOffset{ - n: n, + offset: n, } return po, nil }