lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-10 18:42:19 +02:00
parent bf2d299420
commit 0521e58a09
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
15 changed files with 99 additions and 30 deletions

View file

@ -19,6 +19,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: treat unexpected syslog message as [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) containing only the `message` field when using [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe).
* FEATURE: allow using `where` prefix instead of `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe).
* FEATURE: disallow unescaped `!` char in [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) queries, since it permits writing incorrect query, which may look like correct one. For example, `foo!:bar` instead of `foo:!bar`.
* FEATURE: [web UI](https://docs.victoriametrics.com/VictoriaLogs/querying/#web-ui): add markdown support to the `Group` view. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6292).
## [v0.18.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.18.0-victorialogs)

View file

@ -1608,6 +1608,12 @@ if the number of log messages with the `error` [word](#word) for them over the l
_time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000
```
It is allowed to use `where` prefix instead of `filter` prefix for convenience. For example, the following query is equivalent to the previous one:
```logsql
_time:1h error | stats by (host) count() logs_count | where logs_count:> 1_000
```
It is allowed to omit `filter` prefix if the used filters do not clash with [pipe names](#pipes).
So the following query is equivalent to the previous one:
@ -1736,6 +1742,8 @@ Where `exprX` is one of the supported math expressions mentioned below, while `r
The `as` keyword is optional. The result name can be omitted. In this case the result is stored to a field with the name equal to string represenation
of the corresponding math expression.
`exprX` may reference `resultNameY` calculated before the given `exprX`.
For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field:
```logsql

View file

@ -98,13 +98,6 @@ func (bm *bitmap) areAllBitsSet() bool {
return true
}
func (bm *bitmap) isSetBit(i int) bool {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64
word := bm.a[wordIdx]
return (word & (1 << wordOffset)) != 0
}
func (bm *bitmap) andNot(x *bitmap) {
if bm.bitsLen != x.bitsLen {
logger.Panicf("BUG: cannot merge bitmaps with distinct lengths; %d vs %d", bm.bitsLen, x.bitsLen)
@ -127,6 +120,13 @@ func (bm *bitmap) or(x *bitmap) {
}
}
func (bm *bitmap) isSetBit(i int) bool {
wordIdx := uint(i) / 64
wordOffset := uint(i) % 64
word := bm.a[wordIdx]
return (word & (1 << wordOffset)) != 0
}
// forEachSetBit calls f for each set bit and clears that bit if f returns false
func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
a := bm.a
@ -143,7 +143,7 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
}
idx := i*64 + j
if idx >= bitsLen {
break
return
}
if !f(idx) {
wordNew &= ^mask
@ -178,7 +178,7 @@ func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) {
}
idx := i*64 + j
if idx >= bitsLen {
break
return
}
f(idx)
}

View file

@ -6,6 +6,8 @@ import (
func TestBitmap(t *testing.T) {
for i := 0; i < 100; i++ {
bitsLen := i
bm := getBitmap(i)
if bm.bitsLen != i {
t.Fatalf("unexpected bits length: %d; want %d", bm.bitsLen, i)
@ -41,6 +43,9 @@ func TestBitmap(t *testing.T) {
}
nextIdx++
})
if nextIdx != bitsLen {
t.Fatalf("unexpected number of bits set; got %d; want %d", nextIdx, bitsLen)
}
if !bm.areAllBitsSet() {
t.Fatalf("all bits must be set for bitmap with %d bits", i)
@ -71,6 +76,9 @@ func TestBitmap(t *testing.T) {
}
nextIdx += 2
})
if nextIdx < bitsLen {
t.Fatalf("unexpected number of bits visited; got %d; want %d", nextIdx, bitsLen)
}
// Clear all the bits
bm.forEachSetBit(func(_ int) bool {

View file

@ -4,6 +4,35 @@ import (
"testing"
)
func BenchmarkBitmapIsSetBit(b *testing.B) {
const bitsLen = 64 * 1024
b.Run("no-zero-bits", func(b *testing.B) {
bm := getBitmap(bitsLen)
bm.setBits()
benchmarkBitmapIsSetBit(b, bm)
putBitmap(bm)
})
b.Run("half-zero-bits", func(b *testing.B) {
bm := getBitmap(bitsLen)
bm.setBits()
bm.forEachSetBit(func(idx int) bool {
return idx%2 == 0
})
benchmarkBitmapIsSetBit(b, bm)
putBitmap(bm)
})
b.Run("one-set-bit", func(b *testing.B) {
bm := getBitmap(bitsLen)
bm.setBits()
bm.forEachSetBit(func(idx int) bool {
return idx == bitsLen/2
})
benchmarkBitmapIsSetBit(b, bm)
putBitmap(bm)
})
}
func BenchmarkBitmapForEachSetBitReadonly(b *testing.B) {
const bitsLen = 64 * 1024
@ -86,19 +115,33 @@ func BenchmarkBitmapForEachSetBit(b *testing.B) {
})
}
func benchmarkBitmapIsSetBit(b *testing.B, bm *bitmap) {
bitsLen := bm.bitsLen
b.SetBytes(int64(bitsLen))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
n := 0
for pb.Next() {
for i := 0; i < bitsLen; i++ {
if bm.isSetBit(i) {
n++
}
}
}
GlobalSink.Add(uint64(n))
})
}
func benchmarkBitmapForEachSetBitReadonly(b *testing.B, bm *bitmap) {
b.SetBytes(int64(bm.bitsLen))
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
bmLocal := getBitmap(bm.bitsLen)
n := 0
for pb.Next() {
bmLocal.copyFrom(bm)
bmLocal.forEachSetBitReadonly(func(_ int) {
bm.forEachSetBitReadonly(func(_ int) {
n++
})
}
putBitmap(bmLocal)
GlobalSink.Add(uint64(n))
})
}

View file

@ -32,11 +32,8 @@ type bloomFilter struct {
}
func (bf *bloomFilter) reset() {
bits := bf.bits
for i := range bits {
bits[i] = 0
}
bf.bits = bits[:0]
clear(bf.bits)
bf.bits = bf.bits[:0]
}
// marshal appends marshaled bf to dst and returns the result.

View file

@ -671,7 +671,7 @@ func parseGenericFilter(lex *lexer, fieldName string) (filter, error) {
}
func getCompoundPhrase(lex *lexer, allowColon bool) (string, error) {
stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""}
stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", "!", ""}
if lex.isKeyword(stopTokens...) {
return "", fmt.Errorf("compound phrase cannot start with '%s'", lex.token)
}
@ -688,7 +688,7 @@ func getCompoundPhrase(lex *lexer, allowColon bool) (string, error) {
func getCompoundSuffix(lex *lexer, allowColon bool) string {
s := ""
stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", ""}
stopTokens := []string{"*", ",", "(", ")", "[", "]", "|", "!", ""}
if !allowColon {
stopTokens = append(stopTokens, ":")
}
@ -700,7 +700,7 @@ func getCompoundSuffix(lex *lexer, allowColon bool) string {
}
func getCompoundToken(lex *lexer) (string, error) {
stopTokens := []string{",", "(", ")", "[", "]", "|", ""}
stopTokens := []string{",", "(", ")", "[", "]", "|", "!", ""}
if lex.isKeyword(stopTokens...) {
return "", fmt.Errorf("compound token cannot start with '%s'", lex.token)
}

View file

@ -926,8 +926,8 @@ func TestParseQuerySuccess(t *testing.T) {
f("foo-bar+baz*", `"foo-bar+baz"*`)
f("foo- bar", `foo- bar`)
f("foo -bar", `foo -bar`)
f("foo!bar", `"foo!bar"`)
f("foo:aa!bb:cc", `foo:"aa!bb:cc"`)
f("foo!bar", `foo !bar`)
f("foo:aa!bb:cc", `foo:aa !bb:cc`)
f(`foo:bar:baz`, `foo:"bar:baz"`)
f(`foo:(bar baz:xxx)`, `foo:bar foo:"baz:xxx"`)
f(`foo:(_time:abc or not z)`, `foo:"_time:abc" or !foo:z`)

View file

@ -142,7 +142,7 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
}
return pf, nil
case lex.isKeyword("filter"):
case lex.isKeyword("filter", "where"):
pf, err := parsePipeFilter(lex, true)
if err != nil {
return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)

View file

@ -110,8 +110,8 @@ func (pfp *pipeFilterProcessor) flush() error {
func parsePipeFilter(lex *lexer, needFilterKeyword bool) (*pipeFilter, error) {
if needFilterKeyword {
if !lex.isKeyword("filter") {
return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
if !lex.isKeyword("filter", "where") {
return nil, fmt.Errorf("expecting 'filter' or 'where'; got %q", lex.token)
}
lex.nextToken()
}

View file

@ -75,7 +75,7 @@ func TestPipeFilter(t *testing.T) {
})
// multiple rows
f("filter x:foo y:bar", [][]Field{
f("where x:foo y:bar", [][]Field{
{
{"a", "f1"},
{"x", "foo"},

View file

@ -703,7 +703,7 @@ func parseMathExprFieldName(lex *lexer) (*mathExpr, error) {
}
func getCompoundMathToken(lex *lexer) (string, error) {
stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""}
stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", "!", ""}
if lex.isKeyword(stopTokens...) {
return "", fmt.Errorf("compound token cannot start with '%s'", lex.token)
}

View file

@ -169,6 +169,8 @@ func TestPipeUnpackSyslog(t *testing.T) {
}, [][]Field{
{
{"x", `foobar`},
{"format", "rfc3164"},
{"message", "foobar"},
},
})

View file

@ -239,17 +239,19 @@ func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
func (p *syslogParser) parseRFC3164(s string) {
// See https://datatracker.ietf.org/doc/html/rfc3164
p.addField("format", "rfc3164")
// Parse timestamp
n := len(time.Stamp)
if len(s) < n {
p.addField("message", s)
return
}
p.addField("format", "rfc3164")
t, err := time.Parse(time.Stamp, s[:n])
if err != nil {
// TODO: fall back to parsing ISO8601 timestamp?
p.addField("message", s)
return
}
s = s[n:]
@ -267,6 +269,9 @@ func (p *syslogParser) parseRFC3164(s string) {
if len(s) == 0 || s[0] != ' ' {
// Missing space after the time field
if len(s) > 0 {
p.addField("message", s)
}
return
}
s = s[1:]

View file

@ -44,10 +44,13 @@ func TestSyslogParser(t *testing.T) {
// Incomplete RFC 3164
f("", `{}`)
f("Jun 3 12:08:33", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z"}`)
f("Foo 3 12:08:33", `{"format":"rfc3164","message":"Foo 3 12:08:33"}`)
f("Foo 3 12:08:33bar", `{"format":"rfc3164","message":"Foo 3 12:08:33bar"}`)
f("Jun 3 12:08:33 abcd", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
f("Jun 3 12:08:33 abcd sudo", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
f("Jun 3 12:08:33 abcd sudo[123]", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
f("Jun 3 12:08:33 abcd sudo foobar", `{"format":"rfc3164","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
f(`foo bar baz`, `{"format":"rfc3164","message":"foo bar baz"}`)
// Incomplete RFC 5424
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,