This commit is contained in:
Aliaksandr Valialkin 2024-05-18 21:40:02 +02:00
parent 4cfb2fe60c
commit 13534e7bf6
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
9 changed files with 336 additions and 30 deletions

View file

@ -247,7 +247,7 @@ func (q *Query) AddTimeFilter(start, end int64) {
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
func (q *Query) AddPipeLimit(n uint64) {
q.pipes = append(q.pipes, &pipeLimit{
n: n,
limit: n,
})
}
@ -255,6 +255,7 @@ func (q *Query) AddPipeLimit(n uint64) {
func (q *Query) Optimize() {
q.pipes = optimizeSortOffsetPipes(q.pipes)
q.pipes = optimizeSortLimitPipes(q.pipes)
q.pipes = optimizeUniqLimitPipes(q.pipes)
q.pipes = optimizeFilterPipes(q.pipes)
// Merge `q | filter ...` into q.
@ -266,6 +267,14 @@ func (q *Query) Optimize() {
}
}
// Optimize `q | field_names ...` by marking pipeFieldNames as first pipe
if len(q.pipes) > 0 {
pf, ok := q.pipes[0].(*pipeFieldNames)
if ok {
pf.isFirstPipe = true
}
}
// Optimize 'in(query)' filters
optimizeFilterIn(q.f)
for _, p := range q.pipes {
@ -306,7 +315,7 @@ func optimizeSortOffsetPipes(pipes []pipe) []pipe {
continue
}
if ps.offset == 0 && ps.limit == 0 {
ps.offset = po.n
ps.offset = po.offset
}
pipes = append(pipes[:i], pipes[i+1:]...)
}
@ -327,8 +336,30 @@ func optimizeSortLimitPipes(pipes []pipe) []pipe {
i++
continue
}
if ps.limit == 0 || pl.n < ps.limit {
ps.limit = pl.n
if ps.limit == 0 || pl.limit < ps.limit {
ps.limit = pl.limit
}
pipes = append(pipes[:i], pipes[i+1:]...)
}
return pipes
}
func optimizeUniqLimitPipes(pipes []pipe) []pipe {
// Merge 'uniq ... | limit ...' into 'uniq ... limit ...'
i := 1
for i < len(pipes) {
pl, ok := pipes[i].(*pipeLimit)
if !ok {
i++
continue
}
pu, ok := pipes[i-1].(*pipeUniq)
if !ok {
i++
continue
}
if pu.limit == 0 || pl.limit < pu.limit {
pu.limit = pl.limit
}
pipes = append(pipes[:i], pipes[i+1:]...)
}

View file

@ -811,6 +811,10 @@ func TestParseQuerySuccess(t *testing.T) {
// multiple fields pipes
f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`)
// field_names pipe
f(`foo | field_names as x`, `foo | field_names as x`)
f(`foo | field_names y`, `foo | field_names as y`)
// copy and cp pipe
f(`* | copy foo as bar`, `* | copy foo as bar`)
f(`* | cp foo bar`, `* | copy foo as bar`)
@ -1210,6 +1214,13 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | fields bar,`)
f(`foo | fields bar,,`)
// invalid field_names
f(`foo | field_names ()`)
f(`foo | field_names (x)`)
f(`foo | field_names (x,y)`)
f(`foo | field_names x y`)
f(`foo | field_names x, y`)
// invalid copy and cp pipe
f(`foo | copy`)
f(`foo | cp`)
@ -1375,8 +1386,9 @@ func TestQueryGetNeededColumns(t *testing.T) {
q, err := ParseQuery(s)
if err != nil {
t.Fatalf("cannot parse query %s: %s", s, err)
t.Fatalf("cannot parse query [%s]: %s", s, err)
}
q.Optimize()
needed, unneeded := q.getNeededColumns()
neededColumns := strings.Join(needed, ",")
@ -1486,6 +1498,27 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | uniq by (f1,f2) | rm f1,f3`, `f1,f2`, ``)
f(`* | uniq by (f1,f2) | fields f3`, `f1,f2`, ``)
f(`* | filter foo f1:bar`, `*`, ``)
f(`* | filter foo f1:bar | fields f2`, `f2`, ``)
f(`* | limit 10 | filter foo f1:bar | fields f2`, `_msg,f1,f2`, ``)
f(`* | filter foo f1:bar | fields f1`, `f1`, ``)
f(`* | filter foo f1:bar | rm f1`, `*`, `f1`)
f(`* | limit 10 | filter foo f1:bar | rm f1`, `*`, ``)
f(`* | filter foo f1:bar | rm f2`, `*`, `f2`)
f(`* | limit 10 | filter foo f1:bar | rm f2`, `*`, `f2`)
f(`* | fields x | filter foo f1:bar | rm f2`, `x`, ``)
f(`* | fields x,f1 | filter foo f1:bar | rm f2`, `f1,x`, ``)
f(`* | rm x,f1 | filter foo f1:bar`, `*`, `f1,x`)
f(`* | field_names as foo`, `*`, `_time`)
f(`* | field_names foo | fields bar`, `*`, `_time`)
f(`* | field_names foo | fields foo`, `*`, `_time`)
f(`* | field_names foo | rm foo`, `*`, `_time`)
f(`* | field_names foo | rm bar`, `*`, `_time`)
f(`* | field_names foo | rm _time`, `*`, `_time`)
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
f(`* | rm f1, f2`, `*`, `f1,f2`)
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)

View file

@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
}
pipes = append(pipes, pd)
case lex.isKeyword("field_names"):
pf, err := parsePipeFieldNames(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
}
pipes = append(pipes, pf)
case lex.isKeyword("fields"):
pf, err := parsePipeFields(lex)
if err != nil {

View file

@ -0,0 +1,161 @@
package logstorage
import (
"fmt"
"strings"
"unsafe"
)
// pipeFieldNames processes '| field_names' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe
type pipeFieldNames struct {
// resultName is the name of the column to write results to.
resultName string
// isFirstPipe is set to true if '| field_names' pipe is the first in the query.
//
// This allows skipping loading of _time column.
isFirstPipe bool
}
func (pf *pipeFieldNames) String() string {
return "field_names as " + quoteTokenIfNeeded(pf.resultName)
}
func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add("*")
unneededFields.reset()
if pf.isFirstPipe {
unneededFields.add("_time")
}
}
func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeFieldNamesProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFieldNamesProcessorShard{
pipeFieldNamesProcessorShardNopad: pipeFieldNamesProcessorShardNopad{
m: make(map[string]struct{}),
},
}
}
pfp := &pipeFieldNamesProcessor{
pf: pf,
stopCh: stopCh,
ppBase: ppBase,
shards: shards,
}
return pfp
}
type pipeFieldNamesProcessor struct {
pf *pipeFieldNames
stopCh <-chan struct{}
ppBase pipeProcessor
shards []pipeFieldNamesProcessorShard
}
type pipeFieldNamesProcessorShard struct {
pipeFieldNamesProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeFieldNamesProcessorShardNopad{})%128]byte
}
type pipeFieldNamesProcessorShardNopad struct {
// m holds unique field names.
m map[string]struct{}
}
func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pfp.shards[workerID]
cs := br.getColumns()
for _, c := range cs {
if _, ok := shard.m[c.name]; !ok {
nameCopy := strings.Clone(c.name)
shard.m[nameCopy] = struct{}{}
}
}
}
func (pfp *pipeFieldNamesProcessor) flush() error {
if needStop(pfp.stopCh) {
return nil
}
// merge state across shards
shards := pfp.shards
m := shards[0].m
shards = shards[1:]
for i := range shards {
for k := range shards[i].m {
m[k] = struct{}{}
}
}
// write result
wctx := &pipeFieldNamesWriteContext{
pfp: pfp,
}
wctx.rcs[0].name = pfp.pf.resultName
for k := range m {
wctx.writeRow(k)
}
wctx.flush()
return nil
}
type pipeFieldNamesWriteContext struct {
pfp *pipeFieldNamesProcessor
rcs [1]resultColumn
br blockResult
valuesLen int
}
func (wctx *pipeFieldNamesWriteContext) writeRow(v string) {
wctx.rcs[0].addValue(v)
wctx.valuesLen += len(v)
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeFieldNamesWriteContext) flush() {
br := &wctx.br
wctx.valuesLen = 0
// Flush rcs to ppBase
br.setResultColumns(wctx.rcs[:1])
wctx.pfp.ppBase.writeBlock(0, br)
br.reset()
wctx.rcs[0].resetKeepName()
}
func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) {
if !lex.isKeyword("field_names") {
return nil, fmt.Errorf("expecting 'field_names'; got %q", lex.token)
}
lex.nextToken()
if lex.isKeyword("as") {
lex.nextToken()
}
resultName := getCompoundPhrase(lex, false)
pf := &pipeFieldNames{
resultName: resultName,
}
return pf, nil
}

View file

@ -0,0 +1,38 @@
package logstorage
import (
"testing"
)
func TestPipeFieldNamesUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
p, err := parsePipeFieldNames(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("field_names as f1", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("field_names as f3", "*", "f1,f2", "*", "")
// all the needed fields, unneeded fields intersect with src
f("field_names as f1", "*", "s1,f1,f2", "*", "")
// needed fields do not intersect with src
f("field_names as f3", "f1,f2", "", "*", "")
// needed fields intersect with src
f("field_names as f1", "s1,f1,f2", "", "*", "")
}

View file

@ -17,19 +17,20 @@ func (pf *pipeFilter) String() string {
return "filter " + pf.f.String()
}
func (pf *pipeFilter) updateNeededFields(neededFields, _ fieldsSet) {
func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
fs := newFieldsSet()
pf.f.updateNeededFields(fs)
for f := range fs {
unneededFields.remove(f)
}
} else {
pf.f.updateNeededFields(neededFields)
}
}
func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeFilterProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFilterProcessorShard{
pipeFilterProcessorShardNopad: pipeFilterProcessorShardNopad{
pf: pf,
},
}
}
pfp := &pipeFilterProcessor{
pf: pf,
@ -55,8 +56,6 @@ type pipeFilterProcessorShard struct {
}
type pipeFilterProcessorShardNopad struct {
pf *pipeFilter
br blockResult
bm bitmap
}
@ -71,7 +70,7 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) {
bm := &shard.bm
bm.init(len(br.timestamps))
bm.setBits()
shard.pf.f.applyToBlockResult(br, bm)
pfp.pf.f.applyToBlockResult(br, bm)
if bm.areAllBitsSet() {
// Fast path - the filter didn't filter out anything - send br to the base pipe as is.
pfp.ppBase.writeBlock(workerID, br)

View file

@ -0,0 +1,38 @@
package logstorage
import (
"testing"
)
func TestPipeFilterUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
nfs := newTestFieldsSet(neededFields)
unfs := newTestFieldsSet(unneededFields)
lex := newLexer(s)
p, err := parsePipeFilter(lex)
if err != nil {
t.Fatalf("cannot parse %s: %s", s, err)
}
p.updateNeededFields(nfs, unfs)
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("filter foo f1:bar", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("filter foo f3:bar", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("filter foo f1:bar", "*", "s1,f1,f2", "*", "s1,f2")
// needed fields do not intersect with src
f("filter foo f3:bar", "f1,f2", "", "_msg,f1,f2,f3", "")
// needed fields intersect with src
f("filter foo f1:bar", "s1,f1,f2", "", "_msg,f1,f2,s1", "")
}

View file

@ -9,18 +9,18 @@ import (
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
type pipeLimit struct {
n uint64
limit uint64
}
func (pl *pipeLimit) String() string {
return fmt.Sprintf("limit %d", pl.n)
return fmt.Sprintf("limit %d", pl.limit)
}
func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) {
}
func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
if pl.n == 0 {
if pl.limit == 0 {
// Special case - notify the caller to stop writing data to the returned pipeLimitProcessor
cancel()
}
@ -45,7 +45,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
}
rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= plp.pl.n {
if rowsProcessed <= plp.pl.limit {
// Fast path - write all the rows to ppBase.
plp.ppBase.writeBlock(workerID, br)
return
@ -53,13 +53,13 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
// Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= plp.pl.n {
if rowsProcessed >= plp.pl.limit {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
keepRows := plp.pl.n - rowsProcessed
keepRows := plp.pl.limit - rowsProcessed
br.truncateRows(int(keepRows))
plp.ppBase.writeBlock(workerID, br)
@ -83,7 +83,7 @@ func parsePipeLimit(lex *lexer) (*pipeLimit, error) {
}
lex.nextToken()
pl := &pipeLimit{
n: n,
limit: n,
}
return pl, nil
}

View file

@ -9,11 +9,11 @@ import (
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#offset-pipe
type pipeOffset struct {
n uint64
offset uint64
}
func (po *pipeOffset) String() string {
return fmt.Sprintf("offset %d", po.n)
return fmt.Sprintf("offset %d", po.offset)
}
func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) {
@ -39,17 +39,17 @@ func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) {
}
rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= pop.po.n {
if rowsProcessed <= pop.po.offset {
return
}
rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= pop.po.n {
if rowsProcessed >= pop.po.offset {
pop.ppBase.writeBlock(workerID, br)
return
}
rowsSkip := pop.po.n - rowsProcessed
rowsSkip := pop.po.offset - rowsProcessed
br.skipRows(int(rowsSkip))
pop.ppBase.writeBlock(workerID, br)
}
@ -70,7 +70,7 @@ func parsePipeOffset(lex *lexer) (*pipeOffset, error) {
}
lex.nextToken()
po := &pipeOffset{
n: n,
offset: n,
}
return po, nil
}