VictoriaMetrics/lib/logstorage/pipe_pack_json.go

91 lines
2 KiB
Go
Raw Permalink Normal View History

2024-05-25 19:36:16 +00:00
package logstorage
import (
"fmt"
2024-05-28 23:52:13 +00:00
"slices"
2024-05-25 19:36:16 +00:00
)
// pipePackJSON processes '| pack_json ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe
type pipePackJSON struct {
resultField string
2024-05-28 23:52:13 +00:00
fields []string
2024-05-25 19:36:16 +00:00
}
func (pp *pipePackJSON) String() string {
s := "pack_json"
2024-05-28 23:52:13 +00:00
if len(pp.fields) > 0 {
s += " fields (" + fieldsToString(pp.fields) + ")"
}
2024-05-25 19:36:16 +00:00
if !isMsgFieldName(pp.resultField) {
s += " as " + quoteTokenIfNeeded(pp.resultField)
}
return s
}
2024-06-27 12:18:42 +00:00
func (pp *pipePackJSON) canLiveTail() bool {
return true
}
2024-05-25 19:36:16 +00:00
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
2024-06-05 01:18:12 +00:00
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
2024-05-25 19:36:16 +00:00
}
func (pp *pipePackJSON) optimize() {
// nothing to do
}
func (pp *pipePackJSON) hasFilterInWithQuery() bool {
return false
}
func (pp *pipePackJSON) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
2024-05-25 19:36:16 +00:00
return pp, nil
}
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
2024-06-05 01:18:12 +00:00
return newPipePackProcessor(workersCount, ppNext, pp.resultField, pp.fields, MarshalFieldsToJSON)
2024-05-25 19:36:16 +00:00
}
func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
if !lex.isKeyword("pack_json") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_json")
}
lex.nextToken()
2024-05-28 23:52:13 +00:00
var fields []string
if lex.isKeyword("fields") {
lex.nextToken()
fs, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse fields: %w", err)
}
if slices.Contains(fs, "*") {
fs = nil
}
fields = fs
}
2024-05-25 19:36:16 +00:00
// parse optional 'as ...` part
resultField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
2024-05-28 23:52:13 +00:00
}
if !lex.isKeyword("|", ")", "") {
2024-05-25 19:36:16 +00:00
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)
}
resultField = field
}
pp := &pipePackJSON{
resultField: resultField,
2024-05-28 23:52:13 +00:00
fields: fields,
2024-05-25 19:36:16 +00:00
}
return pp, nil
}