From 0521e58a0982fe5dd6ddf64da031b2d3ad1501fd Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Jun 2024 18:42:19 +0200 Subject: [PATCH] lib/logstorage: work-in-progress --- docs/VictoriaLogs/CHANGELOG.md | 3 ++ docs/VictoriaLogs/LogsQL.md | 8 ++++ lib/logstorage/bitmap.go | 18 ++++---- lib/logstorage/bitmap_test.go | 8 ++++ lib/logstorage/bitmap_timing_test.go | 51 +++++++++++++++++++++-- lib/logstorage/bloomfilter.go | 7 +--- lib/logstorage/parser.go | 6 +-- lib/logstorage/parser_test.go | 4 +- lib/logstorage/pipe.go | 2 +- lib/logstorage/pipe_filter.go | 4 +- lib/logstorage/pipe_filter_test.go | 2 +- lib/logstorage/pipe_math.go | 2 +- lib/logstorage/pipe_unpack_syslog_test.go | 2 + lib/logstorage/syslog_parser.go | 9 +++- lib/logstorage/syslog_parser_test.go | 3 ++ 15 files changed, 99 insertions(+), 30 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index acc752f8e..649468cf7 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: treat unexpected syslog message as [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) containing only the `message` field when using [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe). +* FEATURE: allow using `where` prefix instead of `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe). +* FEATURE: disallow unescaped `!` char in [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) queries, since it permits writing incorrect query, which may look like correct one. For example, `foo!:bar` instead of `foo:!bar`. * FEATURE: [web UI](https://docs.victoriametrics.com/VictoriaLogs/querying/#web-ui): add markdown support to the `Group` view. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6292). ## [v0.18.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.18.0-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 2e063bb00..7ba1cc861 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1608,6 +1608,12 @@ if the number of log messages with the `error` [word](#word) for them over the l _time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000 ``` +It is allowed to use `where` prefix instead of `filter` prefix for convenience. For example, the following query is equivalent to the previous one: + +```logsql +_time:1h error | stats by (host) count() logs_count | where logs_count:> 1_000 +``` + It is allowed to omit `filter` prefix if the used filters do not clash with [pipe names](#pipes). So the following query is equivalent to the previous one: @@ -1736,6 +1742,8 @@ Where `exprX` is one of the supported math expressions mentioned below, while `r The `as` keyword is optional. The result name can be omitted. In this case the result is stored to a field with the name equal to string represenation of the corresponding math expression. +`exprX` may reference `resultNameY` calculated before the given `exprX`. + For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field: ```logsql diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index ac1c10695..350ea0cc8 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -98,13 +98,6 @@ func (bm *bitmap) areAllBitsSet() bool { return true } -func (bm *bitmap) isSetBit(i int) bool { - wordIdx := uint(i) / 64 - wordOffset := uint(i) % 64 - word := bm.a[wordIdx] - return (word & (1 << wordOffset)) != 0 -} - func (bm *bitmap) andNot(x *bitmap) { if bm.bitsLen != x.bitsLen { logger.Panicf("BUG: cannot merge bitmaps with distinct lengths; %d vs %d", bm.bitsLen, x.bitsLen) @@ -127,6 +120,13 @@ func (bm *bitmap) or(x *bitmap) { } } +func (bm *bitmap) isSetBit(i int) bool { + wordIdx := uint(i) / 64 + wordOffset := uint(i) % 64 + word := bm.a[wordIdx] + return (word & (1 << wordOffset)) != 0 +} + // forEachSetBit calls f for each set bit and clears that bit if f returns false func (bm *bitmap) forEachSetBit(f func(idx int) bool) { a := bm.a @@ -143,7 +143,7 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) { } idx := i*64 + j if idx >= bitsLen { - break + return } if !f(idx) { wordNew &= ^mask @@ -178,7 +178,7 @@ func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) { } idx := i*64 + j if idx >= bitsLen { - break + return } f(idx) } diff --git a/lib/logstorage/bitmap_test.go b/lib/logstorage/bitmap_test.go index 1b5f2ba8b..41cce6974 100644 --- a/lib/logstorage/bitmap_test.go +++ b/lib/logstorage/bitmap_test.go @@ -6,6 +6,8 @@ import ( func TestBitmap(t *testing.T) { for i := 0; i < 100; i++ { + bitsLen := i + bm := getBitmap(i) if bm.bitsLen != i { t.Fatalf("unexpected bits length: %d; want %d", bm.bitsLen, i) @@ -41,6 +43,9 @@ func TestBitmap(t *testing.T) { } nextIdx++ }) + if nextIdx != bitsLen { + t.Fatalf("unexpected number of bits set; got %d; want %d", nextIdx, bitsLen) + } if !bm.areAllBitsSet() { t.Fatalf("all bits must be set for bitmap with %d bits", i) @@ -71,6 +76,9 @@ func TestBitmap(t *testing.T) { } nextIdx += 2 }) + if nextIdx < bitsLen { + t.Fatalf("unexpected number of bits visited; got %d; want %d", nextIdx, bitsLen) + } // Clear all the bits bm.forEachSetBit(func(_ int) bool { diff --git a/lib/logstorage/bitmap_timing_test.go b/lib/logstorage/bitmap_timing_test.go index 9f364290e..5e0a36f6a 100644 --- a/lib/logstorage/bitmap_timing_test.go +++ b/lib/logstorage/bitmap_timing_test.go @@ -4,6 +4,35 @@ import ( "testing" ) +func BenchmarkBitmapIsSetBit(b *testing.B) { + const bitsLen = 64 * 1024 + + b.Run("no-zero-bits", func(b *testing.B) { + bm := getBitmap(bitsLen) + bm.setBits() + benchmarkBitmapIsSetBit(b, bm) + putBitmap(bm) + }) + b.Run("half-zero-bits", func(b *testing.B) { + bm := getBitmap(bitsLen) + bm.setBits() + bm.forEachSetBit(func(idx int) bool { + return idx%2 == 0 + }) + benchmarkBitmapIsSetBit(b, bm) + putBitmap(bm) + }) + b.Run("one-set-bit", func(b *testing.B) { + bm := getBitmap(bitsLen) + bm.setBits() + bm.forEachSetBit(func(idx int) bool { + return idx == bitsLen/2 + }) + benchmarkBitmapIsSetBit(b, bm) + putBitmap(bm) + }) +} + func BenchmarkBitmapForEachSetBitReadonly(b *testing.B) { const bitsLen = 64 * 1024 @@ -86,19 +115,33 @@ func BenchmarkBitmapForEachSetBit(b *testing.B) { }) } +func benchmarkBitmapIsSetBit(b *testing.B, bm *bitmap) { + bitsLen := bm.bitsLen + b.SetBytes(int64(bitsLen)) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + n := 0 + for pb.Next() { + for i := 0; i < bitsLen; i++ { + if bm.isSetBit(i) { + n++ + } + } + } + GlobalSink.Add(uint64(n)) + }) +} + func benchmarkBitmapForEachSetBitReadonly(b *testing.B, bm *bitmap) { b.SetBytes(int64(bm.bitsLen)) b.ReportAllocs() b.RunParallel(func(pb *testing.PB) { - bmLocal := getBitmap(bm.bitsLen) n := 0 for pb.Next() { - bmLocal.copyFrom(bm) - bmLocal.forEachSetBitReadonly(func(_ int) { + bm.forEachSetBitReadonly(func(_ int) { n++ }) } - putBitmap(bmLocal) GlobalSink.Add(uint64(n)) }) } diff --git a/lib/logstorage/bloomfilter.go b/lib/logstorage/bloomfilter.go index 9c0a5dd6a..f1ef86918 100644 --- a/lib/logstorage/bloomfilter.go +++ b/lib/logstorage/bloomfilter.go @@ -32,11 +32,8 @@ type bloomFilter struct { } func (bf *bloomFilter) reset() { - bits := bf.bits - for i := range bits { - bits[i] = 0 - } - bf.bits = bits[:0] + clear(bf.bits) + bf.bits = bf.bits[:0] } // marshal appends marshaled bf to dst and returns the result. diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 7e56385bb..d3122f5d8 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -671,7 +671,7 @@ func parseGenericFilter(lex *lexer, fieldName string) (filter, error) { } func getCompoundPhrase(lex *lexer, allowColon bool) (string, error) { - stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""} + stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", "!", ""} if lex.isKeyword(stopTokens...) { return "", fmt.Errorf("compound phrase cannot start with '%s'", lex.token) } @@ -688,7 +688,7 @@ func getCompoundPhrase(lex *lexer, allowColon bool) (string, error) { func getCompoundSuffix(lex *lexer, allowColon bool) string { s := "" - stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""} + stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", "!", ""} if !allowColon { stopTokens = append(stopTokens, ":") } @@ -700,7 +700,7 @@ func getCompoundSuffix(lex *lexer, allowColon bool) string { } func getCompoundToken(lex *lexer) (string, error) { - stopTokens := []string{",", "(", ")", "[", "]", "|", ""} + stopTokens := []string{",", "(", ")", "[", "]", "|", "!", ""} if lex.isKeyword(stopTokens...) { return "", fmt.Errorf("compound token cannot start with '%s'", lex.token) } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 9cc5e4c55..815c3ab12 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -926,8 +926,8 @@ func TestParseQuerySuccess(t *testing.T) { f("foo-bar+baz*", `"foo-bar+baz"*`) f("foo- bar", `foo- bar`) f("foo -bar", `foo -bar`) - f("foo!bar", `"foo!bar"`) - f("foo:aa!bb:cc", `foo:"aa!bb:cc"`) + f("foo!bar", `foo !bar`) + f("foo:aa!bb:cc", `foo:aa !bb:cc`) f(`foo:bar:baz`, `foo:"bar:baz"`) f(`foo:(bar baz:xxx)`, `foo:bar foo:"baz:xxx"`) f(`foo:(_time:abc or not z)`, `foo:"_time:abc" or !foo:z`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 27aa21d1d..abc9cd80c 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -142,7 +142,7 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) } return pf, nil - case lex.isKeyword("filter"): + case lex.isKeyword("filter", "where"): pf, err := parsePipeFilter(lex, true) if err != nil { return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index c1f418f60..c88c6b407 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -110,8 +110,8 @@ func (pfp *pipeFilterProcessor) flush() error { func parsePipeFilter(lex *lexer, needFilterKeyword bool) (*pipeFilter, error) { if needFilterKeyword { - if !lex.isKeyword("filter") { - return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token) + if !lex.isKeyword("filter", "where") { + return nil, fmt.Errorf("expecting 'filter' or 'where'; got %q", lex.token) } lex.nextToken() } diff --git a/lib/logstorage/pipe_filter_test.go b/lib/logstorage/pipe_filter_test.go index dc244ffb0..31edbc6d9 100644 --- a/lib/logstorage/pipe_filter_test.go +++ b/lib/logstorage/pipe_filter_test.go @@ -75,7 +75,7 @@ func TestPipeFilter(t *testing.T) { }) // multiple rows - f("filter x:foo y:bar", [][]Field{ + f("where x:foo y:bar", [][]Field{ { {"a", "f1"}, {"x", "foo"}, diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go index b0f165b22..350c578ea 100644 --- a/lib/logstorage/pipe_math.go +++ b/lib/logstorage/pipe_math.go @@ -703,7 +703,7 @@ func parseMathExprFieldName(lex *lexer) (*mathExpr, error) { } func getCompoundMathToken(lex *lexer) (string, error) { - stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""} + stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", "!", ""} if lex.isKeyword(stopTokens...) { return "", fmt.Errorf("compound token cannot start with '%s'", lex.token) } diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go index 6b7ddd948..8a670dbf7 100644 --- a/lib/logstorage/pipe_unpack_syslog_test.go +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -169,6 +169,8 @@ func TestPipeUnpackSyslog(t *testing.T) { }, [][]Field{ { {"x", `foobar`}, + {"format", "rfc3164"}, + {"message", "foobar"}, }, }) diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go index 5e4f67fa7..51e5f5ccd 100644 --- a/lib/logstorage/syslog_parser.go +++ b/lib/logstorage/syslog_parser.go @@ -239,17 +239,19 @@ func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { func (p *syslogParser) parseRFC3164(s string) { // See https://datatracker.ietf.org/doc/html/rfc3164 + p.addField("format", "rfc3164") + // Parse timestamp n := len(time.Stamp) if len(s) < n { + p.addField("message", s) return } - p.addField("format", "rfc3164") - t, err := time.Parse(time.Stamp, s[:n]) if err != nil { // TODO: fall back to parsing ISO8601 timestamp? + p.addField("message", s) return } s = s[n:] @@ -267,6 +269,9 @@ func (p *syslogParser) parseRFC3164(s string) { if len(s) == 0 || s[0] != ' ' { // Missing space after the time field + if len(s) > 0 { + p.addField("message", s) + } return } s = s[1:] diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go index a82154103..7935f472f 100644 --- a/lib/logstorage/syslog_parser_test.go +++ b/lib/logstorage/syslog_parser_test.go @@ -44,10 +44,13 @@ func TestSyslogParser(t *testing.T) { // Incomplete RFC 3164 f("", `{}`) f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Foo 3 12:08:33", `{"format":"rfc3164","message":"Foo 3 12:08:33"}`) + f("Foo 3 12:08:33bar", `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`) f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + f(`foo bar baz`, `{"format":"rfc3164","message":"foo bar baz"}`) // Incomplete RFC 5424 f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,