This commit is contained in:
Aliaksandr Valialkin 2024-05-22 18:34:08 +02:00
parent 8bb2ffa9b9
commit dbbab6c78e
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 133 additions and 78 deletions

View file

@ -1679,17 +1679,17 @@ See also:
#### Conditional unpack_json #### Conditional unpack_json
If the [`unpack_json` pipe](#unpack_json-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), If the [`unpack_json` pipe](#unpack_json-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
then add `if (<filters>)` to the end of `unpack_json ...`. then add `if (<filters>)` after `unpack_json`.
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks JSON fields only if `ip` field in the current log entry isn't set or empty: The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks JSON fields from `foo` field only if `ip` field in the current log entry isn't set or empty:
```logsql ```logsql
_time:5m | unpack_json if (ip:"") _time:5m | unpack_json if (ip:"") from foo
``` ```
### unpack_logfmt pipe ### unpack_logfmt pipe
`| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields `| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields
from the given `field_name` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names
with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
For example, the following query unpacks [logfmt](https://brandur.org/logfmt) fields from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) For example, the following query unpacks [logfmt](https://brandur.org/logfmt) fields from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
@ -1706,12 +1706,11 @@ The following query is equivalent to the previous one:
_time:5m | unpack_logfmt _time:5m | unpack_logfmt
``` ```
If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, If only some fields must be unpacked from logfmt, then they can be enumerated inside `fields (...)`. For example, the following query extracts only `foo` and `bar` fields
by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields from logfmt stored in the `my_logfmt` field:
from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
```logsql ```logsql
_time:5m | unpack_logfmt result_prefix "foo_" _time:5m | unpack_logfmt from my_logfmt fields (foo, bar)
``` ```
Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe). Performance tip: if you need extracting a single field from long [logfmt](https://brandur.org/logfmt) line, it is faster to use [`extract` pipe](#extract-pipe).
@ -1722,6 +1721,14 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes
_time:5m | extract ' ip=<ip>' _time:5m | extract ' ip=<ip>'
``` ```
If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON,
by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields
from `foo` field:
```logsql
_time:5m | unpack_logfmt from foo result_prefix "foo_"
```
See also: See also:
- [Conditional unpack_logfmt](#conditional-unpack_logfmt) - [Conditional unpack_logfmt](#conditional-unpack_logfmt)
@ -1731,11 +1738,12 @@ See also:
#### Conditional unpack_logfmt #### Conditional unpack_logfmt
If the [`unpack_logfmt` pipe](#unpack_logfmt-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), If the [`unpack_logfmt` pipe](#unpack_logfmt-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
then add `if (<filters>)` to the end of `unpack_logfmt ...`. then add `if (<filters>)` after `unpack_logfmt`.
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks logfmt fields only if `ip` field in the current log entry isn't set or empty: The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks logfmt fields from `foo` field
only if `ip` field in the current log entry isn't set or empty:
```logsql ```logsql
_time:5m | unpack_logfmt if (ip:"") _time:5m | unpack_logfmt if (ip:"") from foo
``` ```
## stats pipe functions ## stats pipe functions

View file

@ -71,16 +71,12 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
} }
} else { } else {
for _, fieldName := range pu.fields { for _, fieldName := range pu.fields {
found := false
for _, f := range p.Fields { for _, f := range p.Fields {
if f.Name == fieldName { if f.Name == fieldName {
uctx.addField(f.Name, f.Value) uctx.addField(f.Name, f.Value)
found = true break
} }
} }
if !found {
uctx.addField(fieldName, "")
}
} }
} }
} }

View file

@ -64,7 +64,6 @@ func TestPipeUnpackJSON(t *testing.T) {
{ {
{"_msg", `{"foo":"bar","z":"q","a":"b"}`}, {"_msg", `{"foo":"bar","z":"q","a":"b"}`},
{"foo", "bar"}, {"foo", "bar"},
{"b", ""},
}, },
}) })

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"strings" "strings"
) )
@ -12,6 +13,11 @@ type pipeUnpackLogfmt struct {
// fromField is the field to unpack logfmt fields from // fromField is the field to unpack logfmt fields from
fromField string fromField string
// fields is an optional list of fields to extract from logfmt.
//
// if it is empty, then all the fields are extracted.
fields []string
// resultPrefix is prefix to add to unpacked field names // resultPrefix is prefix to add to unpacked field names
resultPrefix string resultPrefix string
@ -21,15 +27,18 @@ type pipeUnpackLogfmt struct {
func (pu *pipeUnpackLogfmt) String() string { func (pu *pipeUnpackLogfmt) String() string {
s := "unpack_logfmt" s := "unpack_logfmt"
if pu.iff != nil {
s += " " + pu.iff.String()
}
if !isMsgFieldName(pu.fromField) { if !isMsgFieldName(pu.fromField) {
s += " from " + quoteTokenIfNeeded(pu.fromField) s += " from " + quoteTokenIfNeeded(pu.fromField)
} }
if len(pu.fields) > 0 {
s += " fields (" + fieldsToString(pu.fields) + ")"
}
if pu.resultPrefix != "" { if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
} }
if pu.iff != nil {
s += " " + pu.iff.String()
}
return s return s
} }
@ -48,46 +57,53 @@ func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fiel
} }
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff) addField := func(uctx *fieldsUnpackerContext, name, value string) {
} if len(pu.fields) == 0 || slices.Contains(pu.fields, name) {
func unpackLogfmt(uctx *fieldsUnpackerContext, s string) {
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return
}
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
uctx.addField(name, "")
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
uctx.addField(name, value) uctx.addField(name, value)
s = s[nOffset:]
if len(s) == 0 {
return
}
if s[0] != ' ' {
return
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
uctx.addField(name, s)
return
}
uctx.addField(name, s[:n])
s = s[n+1:]
} }
} }
unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) {
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return
}
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
addField(uctx, name, "")
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
addField(uctx, name, value)
s = s[nOffset:]
if len(s) == 0 {
return
}
if s[0] != ' ' {
return
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
addField(uctx, name, s)
return
}
addField(uctx, name, s[:n])
s = s[n+1:]
}
}
}
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.iff)
} }
func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
@ -96,6 +112,15 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
} }
lex.nextToken() lex.nextToken()
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
fromField := "_msg" fromField := "_msg"
if lex.isKeyword("from") { if lex.isKeyword("from") {
lex.nextToken() lex.nextToken()
@ -106,6 +131,19 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
fromField = f fromField = f
} }
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'fields': %w", err)
}
fields = fs
if slices.Contains(fields, "*") {
fields = nil
}
}
resultPrefix := "" resultPrefix := ""
if lex.isKeyword("result_prefix") { if lex.isKeyword("result_prefix") {
lex.nextToken() lex.nextToken()
@ -118,15 +156,9 @@ func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
pu := &pipeUnpackLogfmt{ pu := &pipeUnpackLogfmt{
fromField: fromField, fromField: fromField,
fields: fields,
resultPrefix: resultPrefix, resultPrefix: resultPrefix,
} iff: iff,
if lex.isKeyword("if") {
iff, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
pu.iff = iff
} }
return pu, nil return pu, nil

View file

@ -11,13 +11,19 @@ func TestParsePipeUnpackLogfmtSuccess(t *testing.T) {
} }
f(`unpack_logfmt`) f(`unpack_logfmt`)
f(`unpack_logfmt fields (a, b)`)
f(`unpack_logfmt if (a:x)`) f(`unpack_logfmt if (a:x)`)
f(`unpack_logfmt if (a:x) fields (a, b)`)
f(`unpack_logfmt from x`) f(`unpack_logfmt from x`)
f(`unpack_logfmt from x if (a:x)`) f(`unpack_logfmt from x fields (a, b)`)
f(`unpack_logfmt if (a:x) from x`)
f(`unpack_logfmt if (a:x) from x fields (a, b)`)
f(`unpack_logfmt from x result_prefix abc`) f(`unpack_logfmt from x result_prefix abc`)
f(`unpack_logfmt from x result_prefix abc if (a:x)`) f(`unpack_logfmt if (a:x) from x result_prefix abc`)
f(`unpack_logfmt if (a:x) from x fields (a, b) result_prefix abc`)
f(`unpack_logfmt result_prefix abc`) f(`unpack_logfmt result_prefix abc`)
f(`unpack_logfmt result_prefix abc if (a:x)`) f(`unpack_logfmt if (a:x) result_prefix abc`)
f(`unpack_logfmt if (a:x) fields (a, b) result_prefix abc`)
} }
func TestParsePipeUnpackLogfmtFailure(t *testing.T) { func TestParsePipeUnpackLogfmtFailure(t *testing.T) {
@ -27,6 +33,7 @@ func TestParsePipeUnpackLogfmtFailure(t *testing.T) {
} }
f(`unpack_logfmt foo`) f(`unpack_logfmt foo`)
f(`unpack_logfmt fields`)
f(`unpack_logfmt if`) f(`unpack_logfmt if`)
f(`unpack_logfmt if (x:y) foobar`) f(`unpack_logfmt if (x:y) foobar`)
f(`unpack_logfmt from`) f(`unpack_logfmt from`)
@ -46,6 +53,19 @@ func TestPipeUnpackLogfmt(t *testing.T) {
expectPipeResults(t, pipeStr, rows, rowsExpected) expectPipeResults(t, pipeStr, rows, rowsExpected)
} }
// unpack a subset of fields
f("unpack_logfmt fields (foo, a, b)", [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"foo", "bar"},
{"a", "b"},
},
})
// single row, unpack from _msg // single row, unpack from _msg
f("unpack_logfmt", [][]Field{ f("unpack_logfmt", [][]Field{
{ {
@ -184,7 +204,7 @@ func TestPipeUnpackLogfmt(t *testing.T) {
}) })
// multiple rows with distinct number of fields, with result_prefix and if condition // multiple rows with distinct number of fields, with result_prefix and if condition
f("unpack_logfmt from x result_prefix qwe_ if (y:abc)", [][]Field{ f("unpack_logfmt if (y:abc) from x result_prefix qwe_", [][]Field{
{ {
{"x", `foo=bar baz=xyz`}, {"x", `foo=bar baz=xyz`},
{"y", `abc`}, {"y", `abc`},
@ -222,25 +242,25 @@ func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) {
// all the needed fields // all the needed fields
f("unpack_logfmt from x", "*", "", "*", "") f("unpack_logfmt from x", "*", "", "*", "")
f("unpack_logfmt from x if (y:z)", "*", "", "*", "") f("unpack_logfmt if (y:z) from x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src // all the needed fields, unneeded fields do not intersect with src
f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2") f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_logfmt from x if (y:z)", "*", "f1,f2", "*", "f1,f2") f("unpack_logfmt if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
f("unpack_logfmt from x if (f1:z)", "*", "f1,f2", "*", "f2") f("unpack_logfmt if (f1:z) from x", "*", "f1,f2", "*", "f2")
// all the needed fields, unneeded fields intersect with src // all the needed fields, unneeded fields intersect with src
f("unpack_logfmt from x", "*", "f2,x", "*", "f2") f("unpack_logfmt from x", "*", "f2,x", "*", "f2")
f("unpack_logfmt from x if (y:z)", "*", "f2,x", "*", "f2") f("unpack_logfmt if (y:z) from x", "*", "f2,x", "*", "f2")
f("unpack_logfmt from x if (f2:z)", "*", "f1,f2,x", "*", "f1") f("unpack_logfmt if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
// needed fields do not intersect with src // needed fields do not intersect with src
f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "") f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "")
f("unpack_logfmt from x if (y:z)", "f1,f2", "", "f1,f2,x,y", "") f("unpack_logfmt if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
f("unpack_logfmt from x if (f1:z)", "f1,f2", "", "f1,f2,x", "") f("unpack_logfmt if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src // needed fields intersect with src
f("unpack_logfmt from x", "f2,x", "", "f2,x", "") f("unpack_logfmt from x", "f2,x", "", "f2,x", "")
f("unpack_logfmt from x if (y:z)", "f2,x", "", "f2,x,y", "") f("unpack_logfmt if (y:z) from x", "f2,x", "", "f2,x,y", "")
f("unpack_logfmt from x if (f2:z y:qwe)", "f2,x", "", "f2,x,y", "") f("unpack_logfmt if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
} }