lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-05-26 01:55:21 +02:00
parent 7ac529c235
commit 1e203f35f7
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 218 additions and 11 deletions

View file

@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.12.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.1-victorialogs)
Released at 2024-05-26
* FEATURE: add support for comments in multi-line LogsQL queries. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#comments).
* BUGFIX: properly apply [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside `if (...)` conditions at various [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes). This bug has been introduced in [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs).
## [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs)
Released at 2024-05-26
@ -31,7 +39,7 @@ Released at 2024-05-26
* BUGFIX: prevent from panic in [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when VictoriaLogs runs on a system with one CPU core.
* BUGFIX: do not return referenced fields if they weren't present in the original logs. For example, `_time:5m | format if (non_existing_field:"") "abc"` could return empty `non_exiting_field`, while it shuldn't be returned because it is missing in the original logs.
* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly.
* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly.
## [v0.11.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.11.0-victorialogs)

View file

@ -2501,6 +2501,18 @@ LogsQL provides the following [pipes](#pipes) for limiting the number of returne
Specific log fields can be queried via [`fields` pipe](#fields-pipe).
## Comments
LogsQL query may contain comments at any place. The comment starts with `#` and continues until the end of the current line.
Example query with comments:
```logsql
error # find logs with `error` word
| stats by (_stream) logs # then count the number of logs per `_stream` label
| sort by (logs) desc # then sort by the found logs in descending order
| limit 5 # and show top 5 streams with the biggest number of logs
```
## Numeric values
LogsQL accepts numeric values in the following formats:

View file

@ -123,9 +123,12 @@ func (lex *lexer) nextToken() {
lex.token = ""
lex.rawToken = ""
lex.isSkippedSpace = false
if len(s) == 0 {
return
}
again:
r, size := utf8.DecodeRuneInString(s)
if r == utf8.RuneError {
lex.nextCharToken(s, size)
@ -139,6 +142,17 @@ func (lex *lexer) nextToken() {
r, size = utf8.DecodeRuneInString(s)
}
if r == '#' {
// skip comment till \n
n := strings.IndexByte(s, '\n')
if n < 0 {
s = ""
} else {
s = s[n+1:]
}
goto again
}
// Try decoding simple token
tokenLen := 0
for isTokenRune(r) || r == '.' {

View file

@ -1059,6 +1059,12 @@ func TestParseQuerySuccess(t *testing.T) {
// multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
// comments
f(`* # some comment | foo bar`, `*`)
f(`foo | # some comment | foo bar
fields x # another comment
|filter "foo#this#isn't a comment"#this is comment`, `foo | fields x | filter "foo#this#isn't a comment"`)
}
func TestParseQueryFailure(t *testing.T) {

View file

@ -43,7 +43,7 @@ func (pf *pipeFilter) initFilterInValues(cache map[string][]string, getFieldValu
return nil, err
}
pfNew := *pf
pf.f = fNew
pfNew.f = fNew
return &pfNew, nil
}

View file

@ -133,16 +133,18 @@ func (ps *pipeStats) hasFilterInWithQuery() bool {
func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
funcsNew := make([]pipeStatsFunc, len(ps.funcs))
for i, f := range ps.funcs {
for i := range ps.funcs {
f := &ps.funcs[i]
iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
f.iff = iffNew
funcsNew[i] = f
fNew := *f
fNew.iff = iffNew
funcsNew[i] = fNew
}
psNew := *ps
ps.funcs = funcsNew
psNew.funcs = funcsNew
return &psNew, nil
}

View file

@ -128,15 +128,21 @@ func (pp *testPipeProcessor) flush() error {
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
t.Helper()
if len(pp.resultRows) != len(expectedRows) {
assertRowsEqual(t, pp.resultRows, expectedRows)
}
func assertRowsEqual(t *testing.T, resultRows, expectedRows [][]Field) {
t.Helper()
if len(resultRows) != len(expectedRows) {
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows))
len(resultRows), len(expectedRows), rowsToString(resultRows), rowsToString(expectedRows))
}
sortTestRows(pp.resultRows)
sortTestRows(resultRows)
sortTestRows(expectedRows)
for i, resultRow := range pp.resultRows {
for i, resultRow := range resultRows {
expectedRow := expectedRows[i]
if len(resultRow) != len(expectedRow) {
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",

View file

@ -3,6 +3,8 @@ package logstorage
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
@ -310,6 +312,163 @@ func TestStorageRunQuery(t *testing.T) {
mustRunQuery(tenantIDs, q, writeBlock)
})
// Run more complex tests
f := func(t *testing.T, query string, rowsExpected [][]Field) {
t.Helper()
q := mustParseQuery(query)
var resultRowsLock sync.Mutex
var resultRows [][]Field
writeBlock := func(_ uint, _ []int64, bcs []BlockColumn) {
if len(bcs) == 0 {
return
}
for i := 0; i < len(bcs[0].Values); i++ {
row := make([]Field, len(bcs))
for j, bc := range bcs {
row[j] = Field{
Name: strings.Clone(bc.Name),
Value: strings.Clone(bc.Values[i]),
}
}
resultRowsLock.Lock()
resultRows = append(resultRows, row)
resultRowsLock.Unlock()
}
}
mustRunQuery(allTenantIDs, q, writeBlock)
assertRowsEqual(t, resultRows, rowsExpected)
}
t.Run("stats-count-total", func(t *testing.T) {
f(t, `* | stats count() rows`, [][]Field{
{
{"rows", "1155"},
},
})
})
t.Run("in-filter-with-subquery-match", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
{
{"rows", "105"},
},
})
})
t.Run("in-filter-with-subquery-mismatch", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:23243 | fields tenant.id) | stats count() rows`, [][]Field{
{
{"rows", "0"},
},
})
})
t.Run("conditional-stats", func(t *testing.T) {
f(t, `* | stats
count() rows_total,
count() if (stream-id:0) stream_0_rows,
count() if (stream-id:1123) stream_x_rows
`, [][]Field{
{
{"rows_total", "1155"},
{"stream_0_rows", "385"},
{"stream_x_rows", "0"},
},
})
})
t.Run("in-filter-with-subquery-in-conditional-stats-mismatch", func(t *testing.T) {
f(t, `* | stats
count() rows_total,
count() if (tenant.id:in(tenant.id:3 | fields tenant.id)) rows_nonzero,
count() if (tenant.id:in(tenant.id:23243 | fields tenant.id)) rows_zero
`, [][]Field{
{
{"rows_total", "1155"},
{"rows_nonzero", "105"},
{"rows_zero", "0"},
},
})
})
t.Run("pipe-extract", func(*testing.T) {
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
{
{"host", "0"},
{"hits", "385"},
},
{
{"host", "1"},
{"hits", "385"},
},
{
{"host", "2"},
{"hits", "385"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
"host-<host>:" from instance
| filter host:~"1|2"
| uniq (tenant.id, host) with hits
| sort by (tenant.id, host)`, [][]Field{
{
{"tenant.id", "{accountID=3,projectID=31}"},
{"host", "1"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=3,projectID=31}"},
{"host", "2"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=4,projectID=41}"},
{"host", "1"},
{"hits", "35"},
},
{
{"tenant.id", "{accountID=4,projectID=41}"},
{"host", "2"},
{"hits", "35"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
| filter host:*
| uniq (host) with hits
| sort by (host)`, [][]Field{
{
{"host", "0"},
{"hits", "35"},
},
{
{"host", "1"},
{"hits", "35"},
},
{
{"host", "2"},
{"hits", "35"},
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
| filter host:""
| uniq (host) with hits
| sort by (host)`, [][]Field{
{
{"host", ""},
{"hits", "1050"},
},
})
})
// Close the storage and delete its data
s.MustClose()
fs.MustRemoveAll(path)
@ -318,7 +477,7 @@ func TestStorageRunQuery(t *testing.T) {
func mustParseQuery(query string) *Query {
q, err := ParseQuery(query)
if err != nil {
panic(fmt.Errorf("BUG: cannot parse %s: %w", query, err))
panic(fmt.Errorf("BUG: cannot parse [%s]: %w", query, err))
}
return q
}