From a50045efdef9eb6bb9af8d9ae35617aeb2f8d2a9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 May 2024 03:52:16 +0200 Subject: [PATCH] wip --- lib/logstorage/json_parser.go | 9 +- lib/logstorage/parser_test.go | 30 ++- lib/logstorage/pipe.go | 6 + lib/logstorage/pipe_extract.go | 16 +- lib/logstorage/pipe_unpack_json.go | 82 +----- lib/logstorage/pipe_unpack_json_test.go | 29 +++ lib/logstorage/pipe_unpack_logfmt.go | 289 ++++++++++++++++++++++ lib/logstorage/pipe_unpack_logfmt_test.go | 175 +++++++++++++ 8 files changed, 550 insertions(+), 86 deletions(-) create mode 100644 lib/logstorage/pipe_unpack_logfmt.go create mode 100644 lib/logstorage/pipe_unpack_logfmt_test.go diff --git a/lib/logstorage/json_parser.go b/lib/logstorage/json_parser.go index 3cf366774..767d636a8 100644 --- a/lib/logstorage/json_parser.go +++ b/lib/logstorage/json_parser.go @@ -33,10 +33,15 @@ type JSONParser struct { } func (p *JSONParser) reset() { + p.resetNobuf() + + p.buf = p.buf[:0] +} + +func (p *JSONParser) resetNobuf() { clear(p.Fields) p.Fields = p.Fields[:0] - p.buf = p.buf[:0] p.prefixBuf = p.prefixBuf[:0] } @@ -90,6 +95,8 @@ func (p *JSONParser) parseLogMessage(msg, prefix string, resetBuf bool) error { } if resetBuf { p.reset() + } else { + p.resetNobuf() } p.prefixBuf = append(p.prefixBuf[:0], prefix...) p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 6212bd0ff..bd45658d8 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1008,6 +1008,12 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | unpack_json from x`, `* | unpack_json from x`) f(`* | unpack_json from x result_prefix y`, `* | unpack_json from x result_prefix y`) + // unpack_logfmt pipe + f(`* | unpack_logfmt`, `* | unpack_logfmt`) + f(`* | unpack_logfmt result_prefix y`, `* | unpack_logfmt result_prefix y`) + f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`) + f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1411,11 +1417,18 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | extract from x "" de`) // invalid unpack_json pipe - f(`foo | extract_json bar`) - f(`foo | extract_json from`) - f(`foo | extract_json result_prefix`) - f(`foo | extract_json result_prefix x from y`) - f(`foo | extract_json from x result_prefix`) + f(`foo | unpack_json bar`) + f(`foo | unpack_json from`) + f(`foo | unpack_json result_prefix`) + f(`foo | unpack_json result_prefix x from y`) + f(`foo | unpack_json from x result_prefix`) + + // invalid unpack_logfmt pipe + f(`foo | unpack_logfmt bar`) + f(`foo | unpack_logfmt from`) + f(`foo | unpack_logfmt result_prefix`) + f(`foo | unpack_logfmt result_prefix x from y`) + f(`foo | unpack_logfmt from x result_prefix`) } func TestQueryGetNeededColumns(t *testing.T) { @@ -1574,6 +1587,13 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | unpack_json from s1 | rm f1`, `*`, `f1`) f(`* | unpack_json from s1 | rm f1,s1`, `*`, `f1`) + f(`* | unpack_logfmt`, `*`, ``) + f(`* | unpack_logfmt from s1`, `*`, ``) + f(`* | unpack_logfmt from s1 | fields f1`, `f1,s1`, ``) + f(`* | unpack_logfmt from s1 | fields s1,f1`, `f1,s1`, ``) + f(`* | unpack_logfmt from s1 | rm f1`, `*`, `f1`) + f(`* | unpack_logfmt from s1 | rm f1,s1`, `*`, `f1`) + f(`* | rm f1, f2`, `*`, `f1,f2`) f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`) f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 32e7b0cea..85323cc7e 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -157,6 +157,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'unpack_json' pipe: %w", err) } return pu, nil + case lex.isKeyword("unpack_logfmt"): + pu, err := parsePipeUnpackLogfmt(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) + } + return pu, nil default: return nil, fmt.Errorf("unexpected pipe %q", lex.token) } diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 7f2bee69d..854c51138 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -252,8 +252,8 @@ func (ef *extractFormat) apply(s string) { nextPrefix = steps[i+1].prefix } - us, nOffset, ok := tryUnquoteString(s) - if ok { + us, nOffset := tryUnquoteString(s) + if nOffset >= 0 { // Matched quoted string matches[i] = us s = s[nOffset:] @@ -279,22 +279,22 @@ func (ef *extractFormat) apply(s string) { } } -func tryUnquoteString(s string) (string, int, bool) { +func tryUnquoteString(s string) (string, int) { if len(s) == 0 { - return s, 0, false + return s, -1 } if s[0] != '"' && s[0] != '`' { - return s, 0, false + return s, -1 } qp, err := strconv.QuotedPrefix(s) if err != nil { - return s, 0, false + return s, -1 } us, err := strconv.Unquote(qp) if err != nil { - return s, 0, false + return s, -1 } - return us, len(qp), true + return us, len(qp) } func parseExtractFormatSteps(s string) ([]extractFormatStep, error) { diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index c26e25d68..f9a44556c 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -60,72 +60,9 @@ type pipeUnpackJSONProcessorShard struct { } type pipeUnpackJSONProcessorShardNopad struct { - jsonParser JSONParser + p JSONParser - rcs []resultColumn - br blockResult - - valuesLen int -} - -func (shard *pipeUnpackJSONProcessorShard) writeRow(ppBase pipeProcessor, br *blockResult, cs []*blockResultColumn, rowIdx int, extraFields []Field) { - rcs := shard.rcs - - areEqualColumns := len(rcs) == len(cs)+len(extraFields) - if areEqualColumns { - for i, f := range extraFields { - if rcs[len(cs)+i].name != f.Name { - areEqualColumns = false - break - } - } - } - if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns - shard.flush(ppBase) - - rcs = shard.rcs[:0] - for _, c := range cs { - rcs = appendResultColumnWithName(rcs, c.name) - } - for _, f := range extraFields { - rcs = appendResultColumnWithName(rcs, f.Name) - } - shard.rcs = rcs - } - - for i, c := range cs { - v := c.getValueAtRow(br, rowIdx) - rcs[i].addValue(v) - shard.valuesLen += len(v) - } - for i, f := range extraFields { - v := f.Value - rcs[len(cs)+i].addValue(v) - shard.valuesLen += len(v) - } - if shard.valuesLen >= 1_000_000 { - shard.flush(ppBase) - } -} - -func (shard *pipeUnpackJSONProcessorShard) flush(ppBase pipeProcessor) { - rcs := shard.rcs - - shard.valuesLen = 0 - - if len(rcs) == 0 { - return - } - - // Flush rcs to ppBase - br := &shard.br - br.setResultColumns(rcs) - ppBase.writeBlock(0, br) - br.reset() - for i := range rcs { - rcs[i].resetValues() - } + wctx pipeUnpackWriteContext } func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []Field { @@ -133,11 +70,11 @@ func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []F // This isn't a JSON object return nil } - if err := shard.jsonParser.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil { + if err := shard.p.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil { // Cannot parse v return nil } - return shard.jsonParser.Fields + return shard.p.Fields } func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { @@ -147,14 +84,15 @@ func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { resultPrefix := pup.pu.resultPrefix shard := &pup.shards[workerID] + wctx := &shard.wctx + wctx.init(br, pup.ppBase) - cs := br.getColumns() c := br.getColumnByName(pup.pu.fromField) if c.isConst { v := c.valuesEncoded[0] extraFields := shard.parseJSON(v, resultPrefix) for rowIdx := range br.timestamps { - shard.writeRow(pup.ppBase, br, cs, rowIdx, extraFields) + wctx.writeRow(rowIdx, extraFields) } } else { values := c.getValues(br) @@ -163,12 +101,12 @@ func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { if i == 0 || values[i-1] != v { extraFields = shard.parseJSON(v, resultPrefix) } - shard.writeRow(pup.ppBase, br, cs, i, extraFields) + wctx.writeRow(i, extraFields) } } - shard.flush(pup.ppBase) - shard.jsonParser.reset() + wctx.flush() + shard.p.reset() } func (pup *pipeUnpackJSONProcessor) flush() error { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index fc162b29c..2f04d5737 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -133,6 +133,35 @@ func TestPipeUnpackJSON(t *testing.T) { }, }) + // multiple rows with distinct number of fields with result_prefix + f("unpack_json from x result_prefix qwe_", [][]Field{ + { + {"x", `{"foo":"bar","baz":"xyz"}`}, + {"y", `abc`}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `{"z":["bar",123]}`}, + }, + }, [][]Field{ + { + {"x", `{"foo":"bar","baz":"xyz"}`}, + {"y", "abc"}, + {"qwe_foo", "bar"}, + {"qwe_baz", "xyz"}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `{"z":["bar",123]}`}, + {"qwe_z", `["bar",123]`}, + }, + }) } func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go new file mode 100644 index 000000000..37f508839 --- /dev/null +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -0,0 +1,289 @@ +package logstorage + +import ( + "fmt" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe +type pipeUnpackLogfmt struct { + fromField string + + resultPrefix string +} + +func (pu *pipeUnpackLogfmt) String() string { + s := "unpack_logfmt" + if !isMsgFieldName(pu.fromField) { + s += " from " + quoteTokenIfNeeded(pu.fromField) + } + if pu.resultPrefix != "" { + s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) + } + return s +} + +func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFields.remove(pu.fromField) + } else { + neededFields.add(pu.fromField) + } +} + +func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]pipeUnpackLogfmtProcessorShard, workersCount) + + pup := &pipeUnpackLogfmtProcessor{ + pu: pu, + ppBase: ppBase, + + shards: shards, + } + return pup +} + +type pipeUnpackLogfmtProcessor struct { + pu *pipeUnpackLogfmt + ppBase pipeProcessor + + shards []pipeUnpackLogfmtProcessorShard +} + +type pipeUnpackLogfmtProcessorShard struct { + pipeUnpackLogfmtProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUnpackLogfmtProcessorShardNopad{})%128]byte +} + +type pipeUnpackLogfmtProcessorShardNopad struct { + p logfmtParser + + wctx pipeUnpackWriteContext +} + +func (pup *pipeUnpackLogfmtProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + resultPrefix := pup.pu.resultPrefix + shard := &pup.shards[workerID] + wctx := &shard.wctx + wctx.init(br, pup.ppBase) + + c := br.getColumnByName(pup.pu.fromField) + if c.isConst { + v := c.valuesEncoded[0] + extraFields := shard.p.parse(v, resultPrefix) + for rowIdx := range br.timestamps { + wctx.writeRow(rowIdx, extraFields) + } + } else { + values := c.getValues(br) + var extraFields []Field + for i, v := range values { + if i == 0 || values[i-1] != v { + extraFields = shard.p.parse(v, resultPrefix) + } + wctx.writeRow(i, extraFields) + } + } + + wctx.flush() + shard.p.reset() +} + +func (pup *pipeUnpackLogfmtProcessor) flush() error { + return nil +} + +func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) { + if !lex.isKeyword("unpack_logfmt") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_logfmt") + } + lex.nextToken() + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + resultPrefix := "" + if lex.isKeyword("result_prefix") { + lex.nextToken() + p, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err) + } + resultPrefix = p + } + + pu := &pipeUnpackLogfmt{ + fromField: fromField, + resultPrefix: resultPrefix, + } + return pu, nil +} + +type pipeUnpackWriteContext struct { + brSrc *blockResult + csSrc []*blockResultColumn + ppBase pipeProcessor + + rcs []resultColumn + br blockResult + + valuesLen int +} + +func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { + wctx.brSrc = brSrc + wctx.csSrc = brSrc.getColumns() + wctx.ppBase = ppBase +} + +func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { + csSrc := wctx.csSrc + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(csSrc)+len(extraFields) + if areEqualColumns { + for i, f := range extraFields { + if rcs[len(csSrc)+i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, c := range csSrc { + rcs = appendResultColumnWithName(rcs, c.name) + } + for _, f := range extraFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + brSrc := wctx.brSrc + for i, c := range csSrc { + v := c.getValueAtRow(brSrc, rowIdx) + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + for i, f := range extraFields { + v := f.Value + rcs[len(csSrc)+i].addValue(v) + wctx.valuesLen += len(v) + } + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeUnpackWriteContext) flush() { + rcs := wctx.rcs + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br := &wctx.br + br.setResultColumns(rcs) + wctx.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +type logfmtParser struct { + Fields []Field + + buf []byte +} + +func (p *logfmtParser) reset() { + clear(p.Fields) + p.Fields = p.Fields[:0] + + p.buf = p.buf[:0] +} + +func (p *logfmtParser) parse(s, resultPrefix string) []Field { + clear(p.Fields) + p.Fields = p.Fields[:0] + + for { + // Search for field name + n := strings.IndexByte(s, '=') + if n < 0 { + // field name couldn't be read + return p.Fields + } + + name := strings.TrimSpace(s[:n]) + s = s[n+1:] + if len(s) == 0 { + p.addField(name, "", resultPrefix) + return p.Fields + } + + // Search for field value + value, nOffset := tryUnquoteString(s) + if nOffset >= 0 { + p.addField(name, value, resultPrefix) + s = s[nOffset:] + if len(s) == 0 { + return p.Fields + } + if s[0] != ' ' { + return p.Fields + } + s = s[1:] + } else { + n := strings.IndexByte(s, ' ') + if n < 0 { + p.addField(name, s, resultPrefix) + return p.Fields + } + p.addField(name, s[:n], resultPrefix) + s = s[n+1:] + } + } +} + +func (p *logfmtParser) addField(name, value, resultPrefix string) { + if resultPrefix != "" { + buf := p.buf + bufLen := len(buf) + buf = append(buf, resultPrefix...) + buf = append(buf, name...) + p.buf = buf + + name = bytesutil.ToUnsafeString(buf[bufLen:]) + } + p.Fields = append(p.Fields, Field{ + Name: name, + Value: value, + }) +} diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go new file mode 100644 index 000000000..d5a267adf --- /dev/null +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -0,0 +1,175 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeUnpackLogfmt(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // single row, unpack from _msg + f("unpack_logfmt", [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar baz="x y=z" a=b`}, + {"foo", "bar"}, + {"baz", "x y=z"}, + {"a", "b"}, + }, + }) + + // single row, unpack from _msg into _msg + f("unpack_logfmt", [][]Field{ + { + {"_msg", `_msg=bar`}, + }, + }, [][]Field{ + { + {"_msg", "bar"}, + }, + }) + + // single row, unpack from missing field + f("unpack_logfmt from x", [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }) + + // single row, unpack from non-json field + f("unpack_logfmt from x", [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + }, + }) + + // single row, unpack from non-logfmt + f("unpack_logfmt from x", [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + }, + }) + + // unpack empty value + f("unpack_logfmt from x", [][]Field{ + { + {"x", `foobar=`}, + }, + }, [][]Field{ + { + {"x", `foobar=`}, + {"foobar", ""}, + }, + }) + f("unpack_logfmt from x", [][]Field{ + { + {"x", `foo="" bar= baz=`}, + }, + }, [][]Field{ + { + {"x", `foo="" bar= baz=`}, + {"foo", ""}, + {"bar", ""}, + {"baz", ""}, + }, + }) + + // multiple rows with distinct number of fields + f("unpack_logfmt from x", [][]Field{ + { + {"x", `foo=bar baz=xyz`}, + {"y", `abc`}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `z=bar`}, + }, + }, [][]Field{ + { + {"x", `foo=bar baz=xyz`}, + {"y", "abc"}, + {"foo", "bar"}, + {"baz", "xyz"}, + }, + { + {"y", `abc`}, + }, + { + {"z", `bar`}, + {"x", `z=bar`}, + }, + }) + + // multiple rows with distinct number of fields, with result_prefix + f("unpack_logfmt from x result_prefix qwe_", [][]Field{ + { + {"x", `foo=bar baz=xyz`}, + {"y", `abc`}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `z=bar`}, + }, + }, [][]Field{ + { + {"x", `foo=bar baz=xyz`}, + {"y", "abc"}, + {"qwe_foo", "bar"}, + {"qwe_baz", "xyz"}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `z=bar`}, + {"qwe_z", `bar`}, + }, + }) +} + +func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("unpack_logfmt from x", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("unpack_logfmt from x", "*", "f2,x", "*", "f2") + + // needed fields do not intersect with src + f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "") + + // needed fields intersect with src + f("unpack_logfmt from x", "f2,x", "", "f2,x", "") +}