diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d931d1125..3366acce1 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`pack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe) for formatting [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) messages. * FEATURE: allow using IPv4 addresses in [range comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filter). For example, `ip:>'12.34.56.78'` is valid filter now. * FEATURE: add `ceil()` and `floor()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). * FEATURE: add support for bitwise `and`, `or` and `xor` operations at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index ddd494a69..9532bb94d 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1167,6 +1167,7 @@ LogsQL supports the following pipes: - [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`offset`](#offset-pipe) skips the given number of selected logs. - [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object. +- [`pack_logfmt`](#pack_logfmt-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message. - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions. @@ -1740,9 +1741,48 @@ _time:5m | pack_json as foo | fields foo See also: +- [`pack_logfmt` pipe](#pack_logfmt-pipe) - [`unpack_json` pipe](#unpack_json-pipe) +### pack_logfmt pipe + +`| pack_logfmt as field_name` [pipe](#pipe) packs all [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message +and stores its as a string in the given `field_name`. + +For example, the following query packs all the fields into [logfmt](https://brandur.org/logfmt) message and stores it +into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) for logs over the last 5 minutes: + +```logsql +_time:5m | pack_logfmt as _msg +``` + +The `as _msg` part can be omitted if packed message is stored into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | pack_logfmt +``` + +If only a subset of labels must be packed into [logfmt](https://brandur.org/logfmt), then it must be listed inside `fields (...)` after `pack_logfmt`. +For example, the following query builds [logfmt](https://brandur.org/logfmt) message with `foo` and `bar` fields only and stores the result in `baz` field: + +```logsql +_time:5m | pack_logfmt fields (foo, bar) as baz +``` + +The `pack_logfmt` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_logfmt` pipe. For example, the following query +leaves only the `foo` label with the original log fields packed into [logfmt](https://brandur.org/logfmt): + +```logsql +_time:5m | pack_logfmt as foo | fields foo +``` + +See also: + +- [`pack_json` pipe](#pack_json-pipe) +- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) + ### rename pipe If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used. @@ -2219,6 +2259,7 @@ See also: - [`extract` pipe](#extract-pipe) - [`unroll` pipe](#unroll-pipe) - [`pack_json` pipe](#pack_json-pipe) +- [`pack_logfmt` pipe](#pack_logfmt-pipe) #### Conditional unpack_json diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 73a3967fd..090294507 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -74,11 +74,6 @@ func (lex *lexer) isQuotedToken() bool { return lex.token != lex.rawToken } -func (lex *lexer) isNumber() bool { - s := lex.rawToken + lex.s - return isNumberPrefix(s) -} - func (lex *lexer) isPrevToken(tokens ...string) bool { for _, token := range tokens { if token == lex.prevToken { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index ff2178025..2c9063f4b 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -178,6 +178,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err) } return pp, nil + case lex.isKeyword("pack_logfmt"): + pp, err := parsePackLogfmt(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'pack_logfmt' pipe: %w", err) + } + return pp, nil case lex.isKeyword("rename", "mv"): pr, err := parsePipeRename(lex) if err != nil { diff --git a/lib/logstorage/pipe_pack.go b/lib/logstorage/pipe_pack.go new file mode 100644 index 000000000..e911121a2 --- /dev/null +++ b/lib/logstorage/pipe_pack.go @@ -0,0 +1,114 @@ +package logstorage + +import ( + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +func updateNeededFieldsForPipePack(neededFields, unneededFields fieldsSet, resultField string, fields []string) { + if neededFields.contains("*") { + if !unneededFields.contains(resultField) { + if len(fields) > 0 { + unneededFields.removeFields(fields) + } else { + unneededFields.reset() + } + } + } else { + if neededFields.contains(resultField) { + neededFields.remove(resultField) + if len(fields) > 0 { + neededFields.addFields(fields) + } else { + neededFields.add("*") + } + } + } +} + +func newPipePackProcessor(workersCount int, ppNext pipeProcessor, resultField string, fields []string, marshalFields func(dst []byte, fields []Field) []byte) pipeProcessor { + return &pipePackProcessor{ + ppNext: ppNext, + resultField: resultField, + fields: fields, + marshalFields: marshalFields, + + shards: make([]pipePackProcessorShard, workersCount), + } +} + +type pipePackProcessor struct { + ppNext pipeProcessor + resultField string + fields []string + marshalFields func(dst []byte, fields []Field) []byte + + shards []pipePackProcessorShard +} + +type pipePackProcessorShard struct { + pipePackProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipePackProcessorShardNopad{})%128]byte +} + +type pipePackProcessorShardNopad struct { + rc resultColumn + + buf []byte + fields []Field + + cs []*blockResultColumn +} + +func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &ppp.shards[workerID] + + shard.rc.name = ppp.resultField + + cs := shard.cs[:0] + if len(ppp.fields) == 0 { + csAll := br.getColumns() + cs = append(cs, csAll...) + } else { + for _, f := range ppp.fields { + c := br.getColumnByName(f) + cs = append(cs, c) + } + } + shard.cs = cs + + buf := shard.buf[:0] + fields := shard.fields + for rowIdx := range br.timestamps { + fields = fields[:0] + for _, c := range cs { + v := c.getValueAtRow(br, rowIdx) + fields = append(fields, Field{ + Name: c.name, + Value: v, + }) + } + + bufLen := len(buf) + buf = ppp.marshalFields(buf, fields) + v := bytesutil.ToUnsafeString(buf[bufLen:]) + shard.rc.addValue(v) + } + shard.fields = fields + + br.addResultColumn(&shard.rc) + ppp.ppNext.writeBlock(workerID, br) + + shard.rc.reset() +} + +func (ppp *pipePackProcessor) flush() error { + return nil +} diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index 0a1686f04..502e51cb8 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -3,9 +3,6 @@ package logstorage import ( "fmt" "slices" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipePackJSON processes '| pack_json ...' pipe. @@ -29,23 +26,7 @@ func (pp *pipePackJSON) String() string { } func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { - if neededFields.contains("*") { - if !unneededFields.contains(pp.resultField) { - if len(pp.fields) > 0 { - unneededFields.removeFields(pp.fields) - } else { - unneededFields.reset() - } - } - } else { - if neededFields.contains(pp.resultField) { - if len(pp.fields) > 0 { - neededFields.addFields(pp.fields) - } else { - neededFields.add("*") - } - } - } + updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields) } func (pp *pipePackJSON) optimize() { @@ -61,85 +42,7 @@ func (pp *pipePackJSON) initFilterInValues(_ map[string][]string, _ getFieldValu } func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { - return &pipePackJSONProcessor{ - pp: pp, - ppNext: ppNext, - - shards: make([]pipePackJSONProcessorShard, workersCount), - } -} - -type pipePackJSONProcessor struct { - pp *pipePackJSON - ppNext pipeProcessor - - shards []pipePackJSONProcessorShard -} - -type pipePackJSONProcessorShard struct { - pipePackJSONProcessorShardNopad - - // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . - _ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte -} - -type pipePackJSONProcessorShardNopad struct { - rc resultColumn - - buf []byte - fields []Field - - cs []*blockResultColumn -} - -func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { - if len(br.timestamps) == 0 { - return - } - - shard := &ppp.shards[workerID] - - shard.rc.name = ppp.pp.resultField - - cs := shard.cs[:0] - if len(ppp.pp.fields) == 0 { - csAll := br.getColumns() - cs = append(cs, csAll...) - } else { - for _, f := range ppp.pp.fields { - c := br.getColumnByName(f) - cs = append(cs, c) - } - } - shard.cs = cs - - buf := shard.buf[:0] - fields := shard.fields - for rowIdx := range br.timestamps { - fields = fields[:0] - for _, c := range cs { - v := c.getValueAtRow(br, rowIdx) - fields = append(fields, Field{ - Name: c.name, - Value: v, - }) - } - - bufLen := len(buf) - buf = MarshalFieldsToJSON(buf, fields) - v := bytesutil.ToUnsafeString(buf[bufLen:]) - shard.rc.addValue(v) - } - shard.fields = fields - - br.addResultColumn(&shard.rc) - ppp.ppNext.writeBlock(workerID, br) - - shard.rc.reset() -} - -func (ppp *pipePackJSONProcessor) flush() error { - return nil + return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToJSON) } func parsePackJSON(lex *lexer) (*pipePackJSON, error) { diff --git a/lib/logstorage/pipe_pack_logfmt.go b/lib/logstorage/pipe_pack_logfmt.go new file mode 100644 index 000000000..082ead30d --- /dev/null +++ b/lib/logstorage/pipe_pack_logfmt.go @@ -0,0 +1,86 @@ +package logstorage + +import ( + "fmt" + "slices" +) + +// pipePackLogfmt processes '| pack_logfmt ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe +type pipePackLogfmt struct { + resultField string + + fields []string +} + +func (pp *pipePackLogfmt) String() string { + s := "pack_logfmt" + if len(pp.fields) > 0 { + s += " fields (" + fieldsToString(pp.fields) + ")" + } + if !isMsgFieldName(pp.resultField) { + s += " as " + quoteTokenIfNeeded(pp.resultField) + } + return s +} + +func (pp *pipePackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields) +} + +func (pp *pipePackLogfmt) optimize() { + // nothing to do +} + +func (pp *pipePackLogfmt) hasFilterInWithQuery() bool { + return false +} + +func (pp *pipePackLogfmt) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pp, nil +} + +func (pp *pipePackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToLogfmt) +} + +func parsePackLogfmt(lex *lexer) (*pipePackLogfmt, error) { + if !lex.isKeyword("pack_logfmt") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_logfmt") + } + lex.nextToken() + + var fields []string + if lex.isKeyword("fields") { + lex.nextToken() + fs, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse fields: %w", err) + } + if slices.Contains(fs, "*") { + fs = nil + } + fields = fs + } + + // parse optional 'as ...` part + resultField := "_msg" + if lex.isKeyword("as") { + lex.nextToken() + } + if !lex.isKeyword("|", ")", "") { + field, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err) + } + resultField = field + } + + pp := &pipePackLogfmt{ + resultField: resultField, + fields: fields, + } + + return pp, nil +} diff --git a/lib/logstorage/pipe_pack_logfmt_test.go b/lib/logstorage/pipe_pack_logfmt_test.go new file mode 100644 index 000000000..57c9503c8 --- /dev/null +++ b/lib/logstorage/pipe_pack_logfmt_test.go @@ -0,0 +1,133 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipePackLogfmtSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`pack_logfmt`) + f(`pack_logfmt as x`) + f(`pack_logfmt fields (a, b)`) + f(`pack_logfmt fields (a, b) as x`) +} + +func TestParsePipePackLogfmtFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`pack_logfmt foo bar`) + f(`pack_logfmt fields`) +} + +func TestPipePackLogfmt(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // pack into _msg + f(`pack_logfmt`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde=ab`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `_msg=x foo=abc bar="cde=ab"`}, + {"foo", `abc`}, + {"bar", `cde=ab`}, + }, + { + {"_msg", `a=b c=d`}, + {"a", "b"}, + {"c", "d"}, + }, + }) + + // pack into other field + f(`pack_logfmt as a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `_msg=x foo=abc bar=cde`}, + }, + { + {"a", `a=b c=d`}, + {"c", "d"}, + }, + }) + + // pack only the needed fields + f(`pack_logfmt fields (foo, baz) a`, [][]Field{ + { + {"_msg", "x"}, + {"foo", `abc`}, + {"bar", `cde`}, + }, + { + {"a", "b"}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"_msg", `x`}, + {"foo", `abc`}, + {"bar", `cde`}, + {"a", `foo=abc baz=`}, + }, + { + {"a", `foo= baz=`}, + {"c", "d"}, + }, + }) +} + +func TestPipePackLogfmtUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`pack_logfmt as x`, "*", "", "*", "") + f(`pack_logfmt fields (a,b) as x`, "*", "", "*", "") + + // unneeded fields do not intersect with output + f(`pack_logfmt as x`, "*", "f1,f2", "*", "") + f(`pack_logfmt fields(f1,f3) as x`, "*", "f1,f2", "*", "f2") + + // unneeded fields intersect with output + f(`pack_logfmt as f1`, "*", "f1,f2", "*", "f1,f2") + f(`pack_logfmt fields (f2,f3) as f1`, "*", "f1,f2", "*", "f1,f2") + + // needed fields do not intersect with output + f(`pack_logfmt f1`, "x,y", "", "x,y", "") + f(`pack_logfmt fields (x,z) f1`, "x,y", "", "x,y", "") + + // needed fields intersect with output + f(`pack_logfmt as f2`, "f2,y", "", "*", "") + f(`pack_logfmt fields (x,y) as f2`, "f2,y", "", "x,y", "") +} diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index b9e8df98e..21906df49 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -64,7 +64,27 @@ func (f *Field) marshalToJSON(dst []byte) []byte { return dst } -// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result. +func (f *Field) marshalToLogfmt(dst []byte) []byte { + dst = append(dst, f.Name...) + dst = append(dst, '=') + if needLogfmtQuoting(f.Value) { + dst = strconv.AppendQuote(dst, f.Value) + } else { + dst = append(dst, f.Value...) + } + return dst +} + +func needLogfmtQuoting(s string) bool { + for _, c := range s { + if !isTokenRune(c) { + return true + } + } + return false +} + +// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result. func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { dst = append(dst, '{') if len(fields) > 0 { @@ -79,6 +99,20 @@ func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { return dst } +// MarshalFieldsToLogfmt appends logfmt-marshaled fields to dst and returns the result. +func MarshalFieldsToLogfmt(dst []byte, fields []Field) []byte { + if len(fields) == 0 { + return dst + } + dst = fields[0].marshalToLogfmt(dst) + fields = fields[1:] + for i := range fields { + dst = append(dst, ' ') + dst = fields[i].marshalToLogfmt(dst) + } + return dst +} + func appendFields(a *arena, dst, src []Field) []Field { for _, f := range src { dst = append(dst, Field{