This commit is contained in:
Aliaksandr Valialkin 2024-05-28 22:41:44 +02:00
parent 3031a3b3f7
commit 7f8d032f43
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 83 additions and 5 deletions

View file

@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
* FEATURE: allow specifying fields, which must be packed into JSON in [`pack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe) via `pack_json fields (field1, ..., fieldN)` syntax.
## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs) ## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
Released at 2024-05-28 Released at 2024-05-28

View file

@ -1560,7 +1560,7 @@ See also:
### math pipe ### math pipe
`| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). `| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
For example, the following query divides `duration_msecs` field value by 1000, then rounds them to integer and stores the result in the `duration_secs` field: For example, the following query divides `duration_msecs` field value by 1000, then rounds it to integer and stores the result in the `duration_secs` field:
```logsql ```logsql
_time:5m | math round(duration_msecs / 1000) as duration_secs _time:5m | math round(duration_msecs / 1000) as duration_secs
@ -1639,7 +1639,14 @@ The following query is equivalent to the previous one:
_time:5m | pack_json _time:5m | pack_json
``` ```
The `pack_json` doesn't touch other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query If only a subset of labels must be packed into JSON, then it must be listed inside `fields (...)` after `pack_json`. For example, the following query builds JSON with `foo` and `bar` fields
only and stores the result in `baz` field:
```logsql
_time:5m | pack_json fields (foo, bar) as baz
```
The `pack_json` doesn't modify or delete other labels. If you do not need them, then add [`| fields ...`](#fields-pipe) after the `pack_json` pipe. For example, the following query
leaves only the `foo` label with the original log fields packed into JSON: leaves only the `foo` label with the original log fields packed into JSON:
```logsql ```logsql

View file

@ -2,6 +2,7 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"unsafe" "unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -12,10 +13,15 @@ import (
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe // See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe
type pipePackJSON struct { type pipePackJSON struct {
resultField string resultField string
fields []string
} }
func (pp *pipePackJSON) String() string { func (pp *pipePackJSON) String() string {
s := "pack_json" s := "pack_json"
if len(pp.fields) > 0 {
s += " fields (" + fieldsToString(pp.fields) + ")"
}
if !isMsgFieldName(pp.resultField) { if !isMsgFieldName(pp.resultField) {
s += " as " + quoteTokenIfNeeded(pp.resultField) s += " as " + quoteTokenIfNeeded(pp.resultField)
} }
@ -25,11 +31,19 @@ func (pp *pipePackJSON) String() string {
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") { if neededFields.contains("*") {
if !unneededFields.contains(pp.resultField) { if !unneededFields.contains(pp.resultField) {
unneededFields.reset() if len(pp.fields) > 0 {
unneededFields.removeFields(pp.fields)
} else {
unneededFields.reset()
}
} }
} else { } else {
if neededFields.contains(pp.resultField) { if neededFields.contains(pp.resultField) {
neededFields.add("*") if len(pp.fields) > 0 {
neededFields.addFields(pp.fields)
} else {
neededFields.add("*")
}
} }
} }
} }
@ -74,6 +88,8 @@ type pipePackJSONProcessorShardNopad struct {
buf []byte buf []byte
fields []Field fields []Field
cs []*blockResultColumn
} }
func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
@ -85,7 +101,17 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
shard.rc.name = ppp.pp.resultField shard.rc.name = ppp.pp.resultField
cs := br.getColumns() cs := shard.cs[:0]
if len(ppp.pp.fields) == 0 {
csAll := br.getColumns()
cs = append(cs, csAll...)
} else {
for _, f := range ppp.pp.fields {
c := br.getColumnByName(f)
cs = append(cs, c)
}
}
shard.cs = cs
buf := shard.buf[:0] buf := shard.buf[:0]
fields := shard.fields fields := shard.fields
@ -122,10 +148,25 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
} }
lex.nextToken() lex.nextToken()
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse fields: %w", err)
}
if slices.Contains(fs, "*") {
fs = nil
}
fields = fs
}
// parse optional 'as ...` part // parse optional 'as ...` part
resultField := "_msg" resultField := "_msg"
if lex.isKeyword("as") { if lex.isKeyword("as") {
lex.nextToken() lex.nextToken()
}
if !lex.isKeyword("|", ")", "") {
field, err := parseFieldName(lex) field, err := parseFieldName(lex)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err) return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)
@ -135,6 +176,7 @@ func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
pp := &pipePackJSON{ pp := &pipePackJSON{
resultField: resultField, resultField: resultField,
fields: fields,
} }
return pp, nil return pp, nil

View file

@ -12,6 +12,8 @@ func TestParsePipePackJSONSuccess(t *testing.T) {
f(`pack_json`) f(`pack_json`)
f(`pack_json as x`) f(`pack_json as x`)
f(`pack_json fields (a, b)`)
f(`pack_json fields (a, b) as x`)
} }
func TestParsePipePackJSONFailure(t *testing.T) { func TestParsePipePackJSONFailure(t *testing.T) {
@ -21,6 +23,7 @@ func TestParsePipePackJSONFailure(t *testing.T) {
} }
f(`pack_json foo bar`) f(`pack_json foo bar`)
f(`pack_json fields`)
} }
func TestPipePackJSON(t *testing.T) { func TestPipePackJSON(t *testing.T) {
@ -76,6 +79,30 @@ func TestPipePackJSON(t *testing.T) {
{"c", "d"}, {"c", "d"},
}, },
}) })
// pack only the needed fields
f(`pack_json fields (foo, baz) a`, [][]Field{
{
{"_msg", "x"},
{"foo", `abc`},
{"bar", `cde`},
},
{
{"a", "b"},
{"c", "d"},
},
}, [][]Field{
{
{"_msg", `x`},
{"foo", `abc`},
{"bar", `cde`},
{"a", `{"foo":"abc","baz":""}`},
},
{
{"a", `{"foo":"","baz":""}`},
{"c", "d"},
},
})
} }
func TestPipePackJSONUpdateNeededFields(t *testing.T) { func TestPipePackJSONUpdateNeededFields(t *testing.T) {