mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
6137ef1bdc
commit
ecec1d90b9
9 changed files with 418 additions and 105 deletions
|
@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: add [`pack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe) for formatting [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) messages.
|
||||||
* FEATURE: allow using IPv4 addresses in [range comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filter). For example, `ip:>'12.34.56.78'` is valid filter now.
|
* FEATURE: allow using IPv4 addresses in [range comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filter). For example, `ip:>'12.34.56.78'` is valid filter now.
|
||||||
* FEATURE: add `ceil()` and `floor()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
|
* FEATURE: add `ceil()` and `floor()` functions to [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
|
||||||
* FEATURE: add support for bitwise `and`, `or` and `xor` operations at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
|
* FEATURE: add support for bitwise `and`, `or` and `xor` operations at [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
|
||||||
|
|
|
@ -1167,6 +1167,7 @@ LogsQL supports the following pipes:
|
||||||
- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
- [`offset`](#offset-pipe) skips the given number of selected logs.
|
- [`offset`](#offset-pipe) skips the given number of selected logs.
|
||||||
- [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object.
|
- [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object.
|
||||||
|
- [`pack_logfmt`](#pack_logfmt-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message.
|
||||||
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
- [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`replace`](#replace-pipe) replaces substrings in the specified [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
|
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
|
||||||
|
@ -1740,9 +1741,48 @@ _time:5m | pack_json as foo | fields foo
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
|
- [`pack_logfmt` pipe](#pack_logfmt-pipe)
|
||||||
- [`unpack_json` pipe](#unpack_json-pipe)
|
- [`unpack_json` pipe](#unpack_json-pipe)
|
||||||
|
|
||||||
|
|
||||||
|
### pack_logfmt pipe
|
||||||
|
|
||||||
|
`| pack_logfmt as field_name` [pipe](#pipe) packs all [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into [logfmt](https://brandur.org/logfmt) message
|
||||||
|
and stores its as a string in the given `field_name`.
|
||||||
|
|
||||||
|
For example, the following query packs all the fields into [logfmt](https://brandur.org/logfmt) message and stores it
|
||||||
|
into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) for logs over the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | pack_logfmt as _msg
|
||||||
|
```
|
||||||
|
|
||||||
|
The `as _msg` part can be omitted if packed message is stored into [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
|
||||||
|
The following query is equivalent to the previous one:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | pack_logfmt
|
||||||
|
```
|
||||||
|
|
||||||
|
If only a subset of labels must be packed into [logfmt](https://brandur.org/logfmt), then it must be listed inside `fields (...)` after `pack_logfmt`.
|
||||||
|
For example, the following query builds [logfmt](https://brandur.org/logfmt) message with `foo` and `bar` fields only and stores the result in `baz` field:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | pack_logfmt fields (foo, bar) as baz
|
||||||
|
```
|
||||||
|
|
||||||
|
The `pack_logfmt` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_logfmt` pipe. For example, the following query
|
||||||
|
leaves only the `foo` label with the original log fields packed into [logfmt](https://brandur.org/logfmt):
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | pack_logfmt as foo | fields foo
|
||||||
|
```
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
- [`pack_json` pipe](#pack_json-pipe)
|
||||||
|
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
|
||||||
|
|
||||||
### rename pipe
|
### rename pipe
|
||||||
|
|
||||||
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.
|
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be renamed, then `| rename src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.
|
||||||
|
@ -2219,6 +2259,7 @@ See also:
|
||||||
- [`extract` pipe](#extract-pipe)
|
- [`extract` pipe](#extract-pipe)
|
||||||
- [`unroll` pipe](#unroll-pipe)
|
- [`unroll` pipe](#unroll-pipe)
|
||||||
- [`pack_json` pipe](#pack_json-pipe)
|
- [`pack_json` pipe](#pack_json-pipe)
|
||||||
|
- [`pack_logfmt` pipe](#pack_logfmt-pipe)
|
||||||
|
|
||||||
#### Conditional unpack_json
|
#### Conditional unpack_json
|
||||||
|
|
||||||
|
|
|
@ -74,11 +74,6 @@ func (lex *lexer) isQuotedToken() bool {
|
||||||
return lex.token != lex.rawToken
|
return lex.token != lex.rawToken
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lex *lexer) isNumber() bool {
|
|
||||||
s := lex.rawToken + lex.s
|
|
||||||
return isNumberPrefix(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (lex *lexer) isPrevToken(tokens ...string) bool {
|
func (lex *lexer) isPrevToken(tokens ...string) bool {
|
||||||
for _, token := range tokens {
|
for _, token := range tokens {
|
||||||
if token == lex.prevToken {
|
if token == lex.prevToken {
|
||||||
|
|
|
@ -178,6 +178,12 @@ func parsePipe(lex *lexer) (pipe, error) {
|
||||||
return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err)
|
return nil, fmt.Errorf("cannot parse 'pack_json' pipe: %w", err)
|
||||||
}
|
}
|
||||||
return pp, nil
|
return pp, nil
|
||||||
|
case lex.isKeyword("pack_logfmt"):
|
||||||
|
pp, err := parsePackLogfmt(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'pack_logfmt' pipe: %w", err)
|
||||||
|
}
|
||||||
|
return pp, nil
|
||||||
case lex.isKeyword("rename", "mv"):
|
case lex.isKeyword("rename", "mv"):
|
||||||
pr, err := parsePipeRename(lex)
|
pr, err := parsePipeRename(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
114
lib/logstorage/pipe_pack.go
Normal file
114
lib/logstorage/pipe_pack.go
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func updateNeededFieldsForPipePack(neededFields, unneededFields fieldsSet, resultField string, fields []string) {
|
||||||
|
if neededFields.contains("*") {
|
||||||
|
if !unneededFields.contains(resultField) {
|
||||||
|
if len(fields) > 0 {
|
||||||
|
unneededFields.removeFields(fields)
|
||||||
|
} else {
|
||||||
|
unneededFields.reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if neededFields.contains(resultField) {
|
||||||
|
neededFields.remove(resultField)
|
||||||
|
if len(fields) > 0 {
|
||||||
|
neededFields.addFields(fields)
|
||||||
|
} else {
|
||||||
|
neededFields.add("*")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPipePackProcessor(workersCount int, ppNext pipeProcessor, resultField string, fields []string, marshalFields func(dst []byte, fields []Field) []byte) pipeProcessor {
|
||||||
|
return &pipePackProcessor{
|
||||||
|
ppNext: ppNext,
|
||||||
|
resultField: resultField,
|
||||||
|
fields: fields,
|
||||||
|
marshalFields: marshalFields,
|
||||||
|
|
||||||
|
shards: make([]pipePackProcessorShard, workersCount),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipePackProcessor struct {
|
||||||
|
ppNext pipeProcessor
|
||||||
|
resultField string
|
||||||
|
fields []string
|
||||||
|
marshalFields func(dst []byte, fields []Field) []byte
|
||||||
|
|
||||||
|
shards []pipePackProcessorShard
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipePackProcessorShard struct {
|
||||||
|
pipePackProcessorShardNopad
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof(pipePackProcessorShardNopad{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipePackProcessorShardNopad struct {
|
||||||
|
rc resultColumn
|
||||||
|
|
||||||
|
buf []byte
|
||||||
|
fields []Field
|
||||||
|
|
||||||
|
cs []*blockResultColumn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ppp *pipePackProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
|
if len(br.timestamps) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
shard := &ppp.shards[workerID]
|
||||||
|
|
||||||
|
shard.rc.name = ppp.resultField
|
||||||
|
|
||||||
|
cs := shard.cs[:0]
|
||||||
|
if len(ppp.fields) == 0 {
|
||||||
|
csAll := br.getColumns()
|
||||||
|
cs = append(cs, csAll...)
|
||||||
|
} else {
|
||||||
|
for _, f := range ppp.fields {
|
||||||
|
c := br.getColumnByName(f)
|
||||||
|
cs = append(cs, c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.cs = cs
|
||||||
|
|
||||||
|
buf := shard.buf[:0]
|
||||||
|
fields := shard.fields
|
||||||
|
for rowIdx := range br.timestamps {
|
||||||
|
fields = fields[:0]
|
||||||
|
for _, c := range cs {
|
||||||
|
v := c.getValueAtRow(br, rowIdx)
|
||||||
|
fields = append(fields, Field{
|
||||||
|
Name: c.name,
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
bufLen := len(buf)
|
||||||
|
buf = ppp.marshalFields(buf, fields)
|
||||||
|
v := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||||
|
shard.rc.addValue(v)
|
||||||
|
}
|
||||||
|
shard.fields = fields
|
||||||
|
|
||||||
|
br.addResultColumn(&shard.rc)
|
||||||
|
ppp.ppNext.writeBlock(workerID, br)
|
||||||
|
|
||||||
|
shard.rc.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ppp *pipePackProcessor) flush() error {
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -3,9 +3,6 @@ package logstorage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"unsafe"
|
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// pipePackJSON processes '| pack_json ...' pipe.
|
// pipePackJSON processes '| pack_json ...' pipe.
|
||||||
|
@ -29,23 +26,7 @@ func (pp *pipePackJSON) String() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||||
if neededFields.contains("*") {
|
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
|
||||||
if !unneededFields.contains(pp.resultField) {
|
|
||||||
if len(pp.fields) > 0 {
|
|
||||||
unneededFields.removeFields(pp.fields)
|
|
||||||
} else {
|
|
||||||
unneededFields.reset()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if neededFields.contains(pp.resultField) {
|
|
||||||
if len(pp.fields) > 0 {
|
|
||||||
neededFields.addFields(pp.fields)
|
|
||||||
} else {
|
|
||||||
neededFields.add("*")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pp *pipePackJSON) optimize() {
|
func (pp *pipePackJSON) optimize() {
|
||||||
|
@ -61,85 +42,7 @@ func (pp *pipePackJSON) initFilterInValues(_ map[string][]string, _ getFieldValu
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
return &pipePackJSONProcessor{
|
return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToJSON)
|
||||||
pp: pp,
|
|
||||||
ppNext: ppNext,
|
|
||||||
|
|
||||||
shards: make([]pipePackJSONProcessorShard, workersCount),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type pipePackJSONProcessor struct {
|
|
||||||
pp *pipePackJSON
|
|
||||||
ppNext pipeProcessor
|
|
||||||
|
|
||||||
shards []pipePackJSONProcessorShard
|
|
||||||
}
|
|
||||||
|
|
||||||
type pipePackJSONProcessorShard struct {
|
|
||||||
pipePackJSONProcessorShardNopad
|
|
||||||
|
|
||||||
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
|
||||||
_ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type pipePackJSONProcessorShardNopad struct {
|
|
||||||
rc resultColumn
|
|
||||||
|
|
||||||
buf []byte
|
|
||||||
fields []Field
|
|
||||||
|
|
||||||
cs []*blockResultColumn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
|
|
||||||
if len(br.timestamps) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
shard := &ppp.shards[workerID]
|
|
||||||
|
|
||||||
shard.rc.name = ppp.pp.resultField
|
|
||||||
|
|
||||||
cs := shard.cs[:0]
|
|
||||||
if len(ppp.pp.fields) == 0 {
|
|
||||||
csAll := br.getColumns()
|
|
||||||
cs = append(cs, csAll...)
|
|
||||||
} else {
|
|
||||||
for _, f := range ppp.pp.fields {
|
|
||||||
c := br.getColumnByName(f)
|
|
||||||
cs = append(cs, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
shard.cs = cs
|
|
||||||
|
|
||||||
buf := shard.buf[:0]
|
|
||||||
fields := shard.fields
|
|
||||||
for rowIdx := range br.timestamps {
|
|
||||||
fields = fields[:0]
|
|
||||||
for _, c := range cs {
|
|
||||||
v := c.getValueAtRow(br, rowIdx)
|
|
||||||
fields = append(fields, Field{
|
|
||||||
Name: c.name,
|
|
||||||
Value: v,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
bufLen := len(buf)
|
|
||||||
buf = MarshalFieldsToJSON(buf, fields)
|
|
||||||
v := bytesutil.ToUnsafeString(buf[bufLen:])
|
|
||||||
shard.rc.addValue(v)
|
|
||||||
}
|
|
||||||
shard.fields = fields
|
|
||||||
|
|
||||||
br.addResultColumn(&shard.rc)
|
|
||||||
ppp.ppNext.writeBlock(workerID, br)
|
|
||||||
|
|
||||||
shard.rc.reset()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ppp *pipePackJSONProcessor) flush() error {
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
|
func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
|
||||||
|
|
86
lib/logstorage/pipe_pack_logfmt.go
Normal file
86
lib/logstorage/pipe_pack_logfmt.go
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pipePackLogfmt processes '| pack_logfmt ...' pipe.
|
||||||
|
//
|
||||||
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_logfmt-pipe
|
||||||
|
type pipePackLogfmt struct {
|
||||||
|
resultField string
|
||||||
|
|
||||||
|
fields []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) String() string {
|
||||||
|
s := "pack_logfmt"
|
||||||
|
if len(pp.fields) > 0 {
|
||||||
|
s += " fields (" + fieldsToString(pp.fields) + ")"
|
||||||
|
}
|
||||||
|
if !isMsgFieldName(pp.resultField) {
|
||||||
|
s += " as " + quoteTokenIfNeeded(pp.resultField)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||||
|
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) optimize() {
|
||||||
|
// nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) hasFilterInWithQuery() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||||||
|
return pp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pp *pipePackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
|
return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToLogfmt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePackLogfmt(lex *lexer) (*pipePackLogfmt, error) {
|
||||||
|
if !lex.isKeyword("pack_logfmt") {
|
||||||
|
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_logfmt")
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
|
||||||
|
var fields []string
|
||||||
|
if lex.isKeyword("fields") {
|
||||||
|
lex.nextToken()
|
||||||
|
fs, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse fields: %w", err)
|
||||||
|
}
|
||||||
|
if slices.Contains(fs, "*") {
|
||||||
|
fs = nil
|
||||||
|
}
|
||||||
|
fields = fs
|
||||||
|
}
|
||||||
|
|
||||||
|
// parse optional 'as ...` part
|
||||||
|
resultField := "_msg"
|
||||||
|
if lex.isKeyword("as") {
|
||||||
|
lex.nextToken()
|
||||||
|
}
|
||||||
|
if !lex.isKeyword("|", ")", "") {
|
||||||
|
field, err := parseFieldName(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse result field for 'pack_logfmt': %w", err)
|
||||||
|
}
|
||||||
|
resultField = field
|
||||||
|
}
|
||||||
|
|
||||||
|
pp := &pipePackLogfmt{
|
||||||
|
resultField: resultField,
|
||||||
|
fields: fields,
|
||||||
|
}
|
||||||
|
|
||||||
|
return pp, nil
|
||||||
|
}
|
133
lib/logstorage/pipe_pack_logfmt_test.go
Normal file
133
lib/logstorage/pipe_pack_logfmt_test.go
Normal file
|
@ -0,0 +1,133 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParsePipePackLogfmtSuccess(t *testing.T) {
|
||||||
|
f := func(pipeStr string) {
|
||||||
|
t.Helper()
|
||||||
|
expectParsePipeSuccess(t, pipeStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
f(`pack_logfmt`)
|
||||||
|
f(`pack_logfmt as x`)
|
||||||
|
f(`pack_logfmt fields (a, b)`)
|
||||||
|
f(`pack_logfmt fields (a, b) as x`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePipePackLogfmtFailure(t *testing.T) {
|
||||||
|
f := func(pipeStr string) {
|
||||||
|
t.Helper()
|
||||||
|
expectParsePipeFailure(t, pipeStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
f(`pack_logfmt foo bar`)
|
||||||
|
f(`pack_logfmt fields`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPipePackLogfmt(t *testing.T) {
|
||||||
|
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||||
|
t.Helper()
|
||||||
|
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pack into _msg
|
||||||
|
f(`pack_logfmt`, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", "x"},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde=ab`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "b"},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", `_msg=x foo=abc bar="cde=ab"`},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde=ab`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"_msg", `a=b c=d`},
|
||||||
|
{"a", "b"},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// pack into other field
|
||||||
|
f(`pack_logfmt as a`, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", "x"},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "b"},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", `x`},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde`},
|
||||||
|
{"a", `_msg=x foo=abc bar=cde`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `a=b c=d`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// pack only the needed fields
|
||||||
|
f(`pack_logfmt fields (foo, baz) a`, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", "x"},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "b"},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"_msg", `x`},
|
||||||
|
{"foo", `abc`},
|
||||||
|
{"bar", `cde`},
|
||||||
|
{"a", `foo=abc baz=`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `foo= baz=`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPipePackLogfmtUpdateNeededFields(t *testing.T) {
|
||||||
|
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// all the needed fields
|
||||||
|
f(`pack_logfmt as x`, "*", "", "*", "")
|
||||||
|
f(`pack_logfmt fields (a,b) as x`, "*", "", "*", "")
|
||||||
|
|
||||||
|
// unneeded fields do not intersect with output
|
||||||
|
f(`pack_logfmt as x`, "*", "f1,f2", "*", "")
|
||||||
|
f(`pack_logfmt fields(f1,f3) as x`, "*", "f1,f2", "*", "f2")
|
||||||
|
|
||||||
|
// unneeded fields intersect with output
|
||||||
|
f(`pack_logfmt as f1`, "*", "f1,f2", "*", "f1,f2")
|
||||||
|
f(`pack_logfmt fields (f2,f3) as f1`, "*", "f1,f2", "*", "f1,f2")
|
||||||
|
|
||||||
|
// needed fields do not intersect with output
|
||||||
|
f(`pack_logfmt f1`, "x,y", "", "x,y", "")
|
||||||
|
f(`pack_logfmt fields (x,z) f1`, "x,y", "", "x,y", "")
|
||||||
|
|
||||||
|
// needed fields intersect with output
|
||||||
|
f(`pack_logfmt as f2`, "f2,y", "", "*", "")
|
||||||
|
f(`pack_logfmt fields (x,y) as f2`, "f2,y", "", "x,y", "")
|
||||||
|
}
|
|
@ -64,7 +64,27 @@ func (f *Field) marshalToJSON(dst []byte) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result.
|
func (f *Field) marshalToLogfmt(dst []byte) []byte {
|
||||||
|
dst = append(dst, f.Name...)
|
||||||
|
dst = append(dst, '=')
|
||||||
|
if needLogfmtQuoting(f.Value) {
|
||||||
|
dst = strconv.AppendQuote(dst, f.Value)
|
||||||
|
} else {
|
||||||
|
dst = append(dst, f.Value...)
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
func needLogfmtQuoting(s string) bool {
|
||||||
|
for _, c := range s {
|
||||||
|
if !isTokenRune(c) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalFieldsToJSON appends JSON-marshaled fields to dst and returns the result.
|
||||||
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
||||||
dst = append(dst, '{')
|
dst = append(dst, '{')
|
||||||
if len(fields) > 0 {
|
if len(fields) > 0 {
|
||||||
|
@ -79,6 +99,20 @@ func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarshalFieldsToLogfmt appends logfmt-marshaled fields to dst and returns the result.
|
||||||
|
func MarshalFieldsToLogfmt(dst []byte, fields []Field) []byte {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
dst = fields[0].marshalToLogfmt(dst)
|
||||||
|
fields = fields[1:]
|
||||||
|
for i := range fields {
|
||||||
|
dst = append(dst, ' ')
|
||||||
|
dst = fields[i].marshalToLogfmt(dst)
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
func appendFields(a *arena, dst, src []Field) []Field {
|
func appendFields(a *arena, dst, src []Field) []Field {
|
||||||
for _, f := range src {
|
for _, f := range src {
|
||||||
dst = append(dst, Field{
|
dst = append(dst, Field{
|
||||||
|
|
Loading…
Reference in a new issue