mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: work-in-progress
This commit is contained in:
parent
6427b3c3c0
commit
afa597ce2a
8 changed files with 218 additions and 11 deletions
|
@ -19,6 +19,14 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
|
||||
## tip
|
||||
|
||||
## [v0.12.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.1-victorialogs)
|
||||
|
||||
Released at 2024-05-26
|
||||
|
||||
* FEATURE: add support for comments in multi-line LogsQL queries. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#comments).
|
||||
|
||||
* BUGFIX: properly apply [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside `if (...)` conditions at various [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes). This bug has been introduced in [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs).
|
||||
|
||||
## [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs)
|
||||
|
||||
Released at 2024-05-26
|
||||
|
@ -31,7 +39,7 @@ Released at 2024-05-26
|
|||
|
||||
* BUGFIX: prevent from panic in [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when VictoriaLogs runs on a system with one CPU core.
|
||||
* BUGFIX: do not return referenced fields if they weren't present in the original logs. For example, `_time:5m | format if (non_existing_field:"") "abc"` could return empty `non_exiting_field`, while it shuldn't be returned because it is missing in the original logs.
|
||||
* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly.
|
||||
* BUGFIX: properly initialize values for [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the `in(...)` contains other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters). For example, `_time:5m | filter ip:in(user_type:admin | fields ip)` now works correctly.
|
||||
|
||||
## [v0.11.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.11.0-victorialogs)
|
||||
|
||||
|
|
|
@ -2501,6 +2501,18 @@ LogsQL provides the following [pipes](#pipes) for limiting the number of returne
|
|||
|
||||
Specific log fields can be queried via [`fields` pipe](#fields-pipe).
|
||||
|
||||
## Comments
|
||||
|
||||
LogsQL query may contain comments at any place. The comment starts with `#` and continues until the end of the current line.
|
||||
Example query with comments:
|
||||
|
||||
```logsql
|
||||
error # find logs with `error` word
|
||||
| stats by (_stream) logs # then count the number of logs per `_stream` label
|
||||
| sort by (logs) desc # then sort by the found logs in descending order
|
||||
| limit 5 # and show top 5 streams with the biggest number of logs
|
||||
```
|
||||
|
||||
## Numeric values
|
||||
|
||||
LogsQL accepts numeric values in the following formats:
|
||||
|
|
|
@ -123,9 +123,12 @@ func (lex *lexer) nextToken() {
|
|||
lex.token = ""
|
||||
lex.rawToken = ""
|
||||
lex.isSkippedSpace = false
|
||||
|
||||
if len(s) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
again:
|
||||
r, size := utf8.DecodeRuneInString(s)
|
||||
if r == utf8.RuneError {
|
||||
lex.nextCharToken(s, size)
|
||||
|
@ -139,6 +142,17 @@ func (lex *lexer) nextToken() {
|
|||
r, size = utf8.DecodeRuneInString(s)
|
||||
}
|
||||
|
||||
if r == '#' {
|
||||
// skip comment till \n
|
||||
n := strings.IndexByte(s, '\n')
|
||||
if n < 0 {
|
||||
s = ""
|
||||
} else {
|
||||
s = s[n+1:]
|
||||
}
|
||||
goto again
|
||||
}
|
||||
|
||||
// Try decoding simple token
|
||||
tokenLen := 0
|
||||
for isTokenRune(r) || r == '.' {
|
||||
|
|
|
@ -1059,6 +1059,12 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
// multiple different pipes
|
||||
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
||||
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
||||
|
||||
// comments
|
||||
f(`* # some comment | foo bar`, `*`)
|
||||
f(`foo | # some comment | foo bar
|
||||
fields x # another comment
|
||||
|filter "foo#this#isn't a comment"#this is comment`, `foo | fields x | filter "foo#this#isn't a comment"`)
|
||||
}
|
||||
|
||||
func TestParseQueryFailure(t *testing.T) {
|
||||
|
|
|
@ -43,7 +43,7 @@ func (pf *pipeFilter) initFilterInValues(cache map[string][]string, getFieldValu
|
|||
return nil, err
|
||||
}
|
||||
pfNew := *pf
|
||||
pf.f = fNew
|
||||
pfNew.f = fNew
|
||||
return &pfNew, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -133,16 +133,18 @@ func (ps *pipeStats) hasFilterInWithQuery() bool {
|
|||
|
||||
func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
||||
funcsNew := make([]pipeStatsFunc, len(ps.funcs))
|
||||
for i, f := range ps.funcs {
|
||||
for i := range ps.funcs {
|
||||
f := &ps.funcs[i]
|
||||
iffNew, err := f.iff.initFilterInValues(cache, getFieldValuesFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
f.iff = iffNew
|
||||
funcsNew[i] = f
|
||||
fNew := *f
|
||||
fNew.iff = iffNew
|
||||
funcsNew[i] = fNew
|
||||
}
|
||||
psNew := *ps
|
||||
ps.funcs = funcsNew
|
||||
psNew.funcs = funcsNew
|
||||
return &psNew, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -128,15 +128,21 @@ func (pp *testPipeProcessor) flush() error {
|
|||
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
|
||||
t.Helper()
|
||||
|
||||
if len(pp.resultRows) != len(expectedRows) {
|
||||
assertRowsEqual(t, pp.resultRows, expectedRows)
|
||||
}
|
||||
|
||||
func assertRowsEqual(t *testing.T, resultRows, expectedRows [][]Field) {
|
||||
t.Helper()
|
||||
|
||||
if len(resultRows) != len(expectedRows) {
|
||||
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
|
||||
len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows))
|
||||
len(resultRows), len(expectedRows), rowsToString(resultRows), rowsToString(expectedRows))
|
||||
}
|
||||
|
||||
sortTestRows(pp.resultRows)
|
||||
sortTestRows(resultRows)
|
||||
sortTestRows(expectedRows)
|
||||
|
||||
for i, resultRow := range pp.resultRows {
|
||||
for i, resultRow := range resultRows {
|
||||
expectedRow := expectedRows[i]
|
||||
if len(resultRow) != len(expectedRow) {
|
||||
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",
|
||||
|
|
|
@ -3,6 +3,8 @@ package logstorage
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -310,6 +312,163 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
mustRunQuery(tenantIDs, q, writeBlock)
|
||||
})
|
||||
|
||||
// Run more complex tests
|
||||
f := func(t *testing.T, query string, rowsExpected [][]Field) {
|
||||
t.Helper()
|
||||
|
||||
q := mustParseQuery(query)
|
||||
var resultRowsLock sync.Mutex
|
||||
var resultRows [][]Field
|
||||
writeBlock := func(_ uint, _ []int64, bcs []BlockColumn) {
|
||||
if len(bcs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < len(bcs[0].Values); i++ {
|
||||
row := make([]Field, len(bcs))
|
||||
for j, bc := range bcs {
|
||||
row[j] = Field{
|
||||
Name: strings.Clone(bc.Name),
|
||||
Value: strings.Clone(bc.Values[i]),
|
||||
}
|
||||
}
|
||||
resultRowsLock.Lock()
|
||||
resultRows = append(resultRows, row)
|
||||
resultRowsLock.Unlock()
|
||||
}
|
||||
}
|
||||
mustRunQuery(allTenantIDs, q, writeBlock)
|
||||
|
||||
assertRowsEqual(t, resultRows, rowsExpected)
|
||||
}
|
||||
|
||||
t.Run("stats-count-total", func(t *testing.T) {
|
||||
f(t, `* | stats count() rows`, [][]Field{
|
||||
{
|
||||
{"rows", "1155"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("in-filter-with-subquery-match", func(t *testing.T) {
|
||||
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
|
||||
{
|
||||
{"rows", "105"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("in-filter-with-subquery-mismatch", func(t *testing.T) {
|
||||
f(t, `tenant.id:in(tenant.id:23243 | fields tenant.id) | stats count() rows`, [][]Field{
|
||||
{
|
||||
{"rows", "0"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("conditional-stats", func(t *testing.T) {
|
||||
f(t, `* | stats
|
||||
count() rows_total,
|
||||
count() if (stream-id:0) stream_0_rows,
|
||||
count() if (stream-id:1123) stream_x_rows
|
||||
`, [][]Field{
|
||||
{
|
||||
{"rows_total", "1155"},
|
||||
{"stream_0_rows", "385"},
|
||||
{"stream_x_rows", "0"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("in-filter-with-subquery-in-conditional-stats-mismatch", func(t *testing.T) {
|
||||
f(t, `* | stats
|
||||
count() rows_total,
|
||||
count() if (tenant.id:in(tenant.id:3 | fields tenant.id)) rows_nonzero,
|
||||
count() if (tenant.id:in(tenant.id:23243 | fields tenant.id)) rows_zero
|
||||
`, [][]Field{
|
||||
{
|
||||
{"rows_total", "1155"},
|
||||
{"rows_nonzero", "105"},
|
||||
{"rows_zero", "0"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("pipe-extract", func(*testing.T) {
|
||||
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
|
||||
{
|
||||
{"host", "0"},
|
||||
{"hits", "385"},
|
||||
},
|
||||
{
|
||||
{"host", "1"},
|
||||
{"hits", "385"},
|
||||
},
|
||||
{
|
||||
{"host", "2"},
|
||||
{"hits", "385"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) {
|
||||
f(t, `* | extract
|
||||
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
|
||||
"host-<host>:" from instance
|
||||
| filter host:~"1|2"
|
||||
| uniq (tenant.id, host) with hits
|
||||
| sort by (tenant.id, host)`, [][]Field{
|
||||
{
|
||||
{"tenant.id", "{accountID=3,projectID=31}"},
|
||||
{"host", "1"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
{
|
||||
{"tenant.id", "{accountID=3,projectID=31}"},
|
||||
{"host", "2"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
{
|
||||
{"tenant.id", "{accountID=4,projectID=41}"},
|
||||
{"host", "1"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
{
|
||||
{"tenant.id", "{accountID=4,projectID=41}"},
|
||||
{"host", "2"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) {
|
||||
f(t, `* | extract
|
||||
if (tenant.id:in(tenant.id:3 | fields tenant.id))
|
||||
"host-<host>:" from instance
|
||||
| filter host:*
|
||||
| uniq (host) with hits
|
||||
| sort by (host)`, [][]Field{
|
||||
{
|
||||
{"host", "0"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
{
|
||||
{"host", "1"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
{
|
||||
{"host", "2"},
|
||||
{"hits", "35"},
|
||||
},
|
||||
})
|
||||
})
|
||||
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) {
|
||||
f(t, `* | extract
|
||||
if (tenant.id:in(tenant.id:3 | fields tenant.id))
|
||||
"host-<host>:" from instance
|
||||
| filter host:""
|
||||
| uniq (host) with hits
|
||||
| sort by (host)`, [][]Field{
|
||||
{
|
||||
{"host", ""},
|
||||
{"hits", "1050"},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
// Close the storage and delete its data
|
||||
s.MustClose()
|
||||
fs.MustRemoveAll(path)
|
||||
|
@ -318,7 +477,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
func mustParseQuery(query string) *Query {
|
||||
q, err := ParseQuery(query)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("BUG: cannot parse %s: %w", query, err))
|
||||
panic(fmt.Errorf("BUG: cannot parse [%s]: %w", query, err))
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue