mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
1c9b834b1e
commit
ad0cc2d55e
6 changed files with 204 additions and 5 deletions
|
@ -23,6 +23,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
Released at 2024-05-26
|
Released at 2024-05-26
|
||||||
|
|
||||||
|
* FEATURE: add support for comments in multi-line LogsQL queries. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#comments).
|
||||||
|
|
||||||
* BUGFIX: properly apply [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside `if (...)` conditions at various [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes). This bug has been introduced in [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs).
|
* BUGFIX: properly apply [`in(...)` filter](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) inside `if (...)` conditions at various [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes). This bug has been introduced in [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs).
|
||||||
|
|
||||||
## [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs)
|
## [v0.12.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.0-victorialogs)
|
||||||
|
|
|
@ -2501,6 +2501,18 @@ LogsQL provides the following [pipes](#pipes) for limiting the number of returne
|
||||||
|
|
||||||
Specific log fields can be queried via [`fields` pipe](#fields-pipe).
|
Specific log fields can be queried via [`fields` pipe](#fields-pipe).
|
||||||
|
|
||||||
|
## Comments
|
||||||
|
|
||||||
|
LogsQL query may contain comments at any place. The comment starts with `#` and continues until the end of the current line.
|
||||||
|
Example query with comments:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
error # find logs with `error` word
|
||||||
|
| stats by (_stream) logs # then count the number of logs per `_stream` label
|
||||||
|
| sort by (logs) desc # then sort by the found logs in descending order
|
||||||
|
| limit 5 # and show top 5 streams with the biggest number of logs
|
||||||
|
```
|
||||||
|
|
||||||
## Numeric values
|
## Numeric values
|
||||||
|
|
||||||
LogsQL accepts numeric values in the following formats:
|
LogsQL accepts numeric values in the following formats:
|
||||||
|
|
|
@ -123,9 +123,12 @@ func (lex *lexer) nextToken() {
|
||||||
lex.token = ""
|
lex.token = ""
|
||||||
lex.rawToken = ""
|
lex.rawToken = ""
|
||||||
lex.isSkippedSpace = false
|
lex.isSkippedSpace = false
|
||||||
|
|
||||||
if len(s) == 0 {
|
if len(s) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
again:
|
||||||
r, size := utf8.DecodeRuneInString(s)
|
r, size := utf8.DecodeRuneInString(s)
|
||||||
if r == utf8.RuneError {
|
if r == utf8.RuneError {
|
||||||
lex.nextCharToken(s, size)
|
lex.nextCharToken(s, size)
|
||||||
|
@ -139,6 +142,17 @@ func (lex *lexer) nextToken() {
|
||||||
r, size = utf8.DecodeRuneInString(s)
|
r, size = utf8.DecodeRuneInString(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r == '#' {
|
||||||
|
// skip comment till \n
|
||||||
|
n := strings.IndexByte(s, '\n')
|
||||||
|
if n < 0 {
|
||||||
|
s = ""
|
||||||
|
} else {
|
||||||
|
s = s[n+1:]
|
||||||
|
}
|
||||||
|
goto again
|
||||||
|
}
|
||||||
|
|
||||||
// Try decoding simple token
|
// Try decoding simple token
|
||||||
tokenLen := 0
|
tokenLen := 0
|
||||||
for isTokenRune(r) || r == '.' {
|
for isTokenRune(r) || r == '.' {
|
||||||
|
|
|
@ -1059,6 +1059,12 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
// multiple different pipes
|
// multiple different pipes
|
||||||
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
||||||
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
||||||
|
|
||||||
|
// comments
|
||||||
|
f(`* # some comment | foo bar`, `*`)
|
||||||
|
f(`foo | # some comment | foo bar
|
||||||
|
fields x # another comment
|
||||||
|
|filter "foo#this#isn't a comment"#this is comment`, `foo | fields x | filter "foo#this#isn't a comment"`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseQueryFailure(t *testing.T) {
|
func TestParseQueryFailure(t *testing.T) {
|
||||||
|
|
|
@ -128,15 +128,21 @@ func (pp *testPipeProcessor) flush() error {
|
||||||
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
|
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
if len(pp.resultRows) != len(expectedRows) {
|
assertRowsEqual(t, pp.resultRows, expectedRows)
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertRowsEqual(t *testing.T, resultRows, expectedRows [][]Field) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
if len(resultRows) != len(expectedRows) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
|
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
|
||||||
len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows))
|
len(resultRows), len(expectedRows), rowsToString(resultRows), rowsToString(expectedRows))
|
||||||
}
|
}
|
||||||
|
|
||||||
sortTestRows(pp.resultRows)
|
sortTestRows(resultRows)
|
||||||
sortTestRows(expectedRows)
|
sortTestRows(expectedRows)
|
||||||
|
|
||||||
for i, resultRow := range pp.resultRows {
|
for i, resultRow := range resultRows {
|
||||||
expectedRow := expectedRows[i]
|
expectedRow := expectedRows[i]
|
||||||
if len(resultRow) != len(expectedRow) {
|
if len(resultRow) != len(expectedRow) {
|
||||||
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",
|
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",
|
||||||
|
|
|
@ -3,6 +3,8 @@ package logstorage
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -310,6 +312,163 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
mustRunQuery(tenantIDs, q, writeBlock)
|
mustRunQuery(tenantIDs, q, writeBlock)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Run more complex tests
|
||||||
|
f := func(t *testing.T, query string, rowsExpected [][]Field) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
q := mustParseQuery(query)
|
||||||
|
var resultRowsLock sync.Mutex
|
||||||
|
var resultRows [][]Field
|
||||||
|
writeBlock := func(_ uint, _ []int64, bcs []BlockColumn) {
|
||||||
|
if len(bcs) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(bcs[0].Values); i++ {
|
||||||
|
row := make([]Field, len(bcs))
|
||||||
|
for j, bc := range bcs {
|
||||||
|
row[j] = Field{
|
||||||
|
Name: strings.Clone(bc.Name),
|
||||||
|
Value: strings.Clone(bc.Values[i]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
resultRowsLock.Lock()
|
||||||
|
resultRows = append(resultRows, row)
|
||||||
|
resultRowsLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mustRunQuery(allTenantIDs, q, writeBlock)
|
||||||
|
|
||||||
|
assertRowsEqual(t, resultRows, rowsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run("stats-count-total", func(t *testing.T) {
|
||||||
|
f(t, `* | stats count() rows`, [][]Field{
|
||||||
|
{
|
||||||
|
{"rows", "1155"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("in-filter-with-subquery-match", func(t *testing.T) {
|
||||||
|
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
|
||||||
|
{
|
||||||
|
{"rows", "105"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("in-filter-with-subquery-mismatch", func(t *testing.T) {
|
||||||
|
f(t, `tenant.id:in(tenant.id:23243 | fields tenant.id) | stats count() rows`, [][]Field{
|
||||||
|
{
|
||||||
|
{"rows", "0"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("conditional-stats", func(t *testing.T) {
|
||||||
|
f(t, `* | stats
|
||||||
|
count() rows_total,
|
||||||
|
count() if (stream-id:0) stream_0_rows,
|
||||||
|
count() if (stream-id:1123) stream_x_rows
|
||||||
|
`, [][]Field{
|
||||||
|
{
|
||||||
|
{"rows_total", "1155"},
|
||||||
|
{"stream_0_rows", "385"},
|
||||||
|
{"stream_x_rows", "0"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("in-filter-with-subquery-in-conditional-stats-mismatch", func(t *testing.T) {
|
||||||
|
f(t, `* | stats
|
||||||
|
count() rows_total,
|
||||||
|
count() if (tenant.id:in(tenant.id:3 | fields tenant.id)) rows_nonzero,
|
||||||
|
count() if (tenant.id:in(tenant.id:23243 | fields tenant.id)) rows_zero
|
||||||
|
`, [][]Field{
|
||||||
|
{
|
||||||
|
{"rows_total", "1155"},
|
||||||
|
{"rows_nonzero", "105"},
|
||||||
|
{"rows_zero", "0"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("pipe-extract", func(*testing.T) {
|
||||||
|
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
|
||||||
|
{
|
||||||
|
{"host", "0"},
|
||||||
|
{"hits", "385"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"host", "1"},
|
||||||
|
{"hits", "385"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"host", "2"},
|
||||||
|
{"hits", "385"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) {
|
||||||
|
f(t, `* | extract
|
||||||
|
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
|
||||||
|
"host-<host>:" from instance
|
||||||
|
| filter host:~"1|2"
|
||||||
|
| uniq (tenant.id, host) with hits
|
||||||
|
| sort by (tenant.id, host)`, [][]Field{
|
||||||
|
{
|
||||||
|
{"tenant.id", "{accountID=3,projectID=31}"},
|
||||||
|
{"host", "1"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"tenant.id", "{accountID=3,projectID=31}"},
|
||||||
|
{"host", "2"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"tenant.id", "{accountID=4,projectID=41}"},
|
||||||
|
{"host", "1"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"tenant.id", "{accountID=4,projectID=41}"},
|
||||||
|
{"host", "2"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) {
|
||||||
|
f(t, `* | extract
|
||||||
|
if (tenant.id:in(tenant.id:3 | fields tenant.id))
|
||||||
|
"host-<host>:" from instance
|
||||||
|
| filter host:*
|
||||||
|
| uniq (host) with hits
|
||||||
|
| sort by (host)`, [][]Field{
|
||||||
|
{
|
||||||
|
{"host", "0"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"host", "1"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"host", "2"},
|
||||||
|
{"hits", "35"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) {
|
||||||
|
f(t, `* | extract
|
||||||
|
if (tenant.id:in(tenant.id:3 | fields tenant.id))
|
||||||
|
"host-<host>:" from instance
|
||||||
|
| filter host:""
|
||||||
|
| uniq (host) with hits
|
||||||
|
| sort by (host)`, [][]Field{
|
||||||
|
{
|
||||||
|
{"host", ""},
|
||||||
|
{"hits", "1050"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
// Close the storage and delete its data
|
// Close the storage and delete its data
|
||||||
s.MustClose()
|
s.MustClose()
|
||||||
fs.MustRemoveAll(path)
|
fs.MustRemoveAll(path)
|
||||||
|
@ -318,7 +477,7 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
func mustParseQuery(query string) *Query {
|
func mustParseQuery(query string) *Query {
|
||||||
q, err := ParseQuery(query)
|
q, err := ParseQuery(query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("BUG: cannot parse %s: %w", query, err))
|
panic(fmt.Errorf("BUG: cannot parse [%s]: %w", query, err))
|
||||||
}
|
}
|
||||||
return q
|
return q
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue