VictoriaMetrics/lib/logstorage/pipe_pack_json.go

141 lines
2.9 KiB
Go
Raw Normal View History

2024-05-25 19:36:16 +00:00
package logstorage
import (
"fmt"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipePackJSON processes '| pack_json ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#pack_json-pipe
type pipePackJSON struct {
resultField string
}
func (pp *pipePackJSON) String() string {
s := "pack_json"
if !isMsgFieldName(pp.resultField) {
s += " as " + quoteTokenIfNeeded(pp.resultField)
}
return s
}
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
if !unneededFields.contains(pp.resultField) {
unneededFields.reset()
}
} else {
if neededFields.contains(pp.resultField) {
neededFields.add("*")
}
}
}
func (pp *pipePackJSON) optimize() {
// nothing to do
}
func (pp *pipePackJSON) hasFilterInWithQuery() bool {
return false
}
func (pp *pipePackJSON) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
return pp, nil
}
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipePackJSONProcessor{
pp: pp,
ppNext: ppNext,
shards: make([]pipePackJSONProcessorShard, workersCount),
}
}
type pipePackJSONProcessor struct {
pp *pipePackJSON
ppNext pipeProcessor
shards []pipePackJSONProcessorShard
}
type pipePackJSONProcessorShard struct {
pipePackJSONProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipePackJSONProcessorShardNopad{})%128]byte
}
type pipePackJSONProcessorShardNopad struct {
rc resultColumn
buf []byte
fields []Field
}
func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &ppp.shards[workerID]
shard.rc.name = ppp.pp.resultField
cs := br.getColumns()
buf := shard.buf[:0]
fields := shard.fields
for rowIdx := range br.timestamps {
fields = fields[:0]
for _, c := range cs {
v := c.getValueAtRow(br, rowIdx)
fields = append(fields, Field{
Name: c.name,
Value: v,
})
}
bufLen := len(buf)
buf = marshalFieldsToJSON(buf, fields)
v := bytesutil.ToUnsafeString(buf[bufLen:])
shard.rc.addValue(v)
}
br.addResultColumn(&shard.rc)
ppp.ppNext.writeBlock(workerID, br)
shard.rc.reset()
}
func (ppp *pipePackJSONProcessor) flush() error {
return nil
}
func parsePackJSON(lex *lexer) (*pipePackJSON, error) {
if !lex.isKeyword("pack_json") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "pack_json")
}
lex.nextToken()
// parse optional 'as ...` part
resultField := "_msg"
if lex.isKeyword("as") {
lex.nextToken()
field, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result field for 'pack_json': %w", err)
}
resultField = field
}
pp := &pipePackJSON{
resultField: resultField,
}
return pp, nil
}