diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index a15545ede..6212bd0ff 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1002,6 +1002,12 @@ func TestParseQuerySuccess(t *testing.T) { f("* | extract from x `foobaz`", `* | extract from x "foobaz"`) f("* | extract from x foobaz", `* | extract from x "foobaz"`) + // unpack_json pipe + f(`* | unpack_json`, `* | unpack_json`) + f(`* | unpack_json result_prefix y`, `* | unpack_json result_prefix y`) + f(`* | unpack_json from x`, `* | unpack_json from x`) + f(`* | unpack_json from x result_prefix y`, `* | unpack_json from x result_prefix y`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1403,6 +1409,13 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | extract from x "abc"`) f(`foo | extract from x "" de`) + + // invalid unpack_json pipe + f(`foo | extract_json bar`) + f(`foo | extract_json from`) + f(`foo | extract_json result_prefix`) + f(`foo | extract_json result_prefix x from y`) + f(`foo | extract_json from x result_prefix`) } func TestQueryGetNeededColumns(t *testing.T) { @@ -1544,15 +1557,22 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) - f(`* | extract from s1 "x"`, `*`, ``) + f(`* | extract from s1 "x"`, `*`, `f1,f2`) f(`* | extract from s1 "x" | fields foo`, `foo`, ``) f(`* | extract from s1 "x" | fields foo,s1`, `foo,s1`, ``) f(`* | extract from s1 "x" | fields foo,f1`, `foo,s1`, ``) f(`* | extract from s1 "x" | fields foo,f1,f2`, `foo,s1`, ``) - f(`* | extract from s1 "x" | rm foo`, `*`, `foo`) - f(`* | extract from s1 "x" | rm foo,s1`, `*`, `foo`) - f(`* | extract from s1 "x" | rm foo,f1`, `*`, `foo`) - f(`* | extract from s1 "x" | rm foo,f1,f2`, `*`, `foo,s1`) + f(`* | extract from s1 "x" | rm foo`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" | rm foo,s1`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" | rm foo,f1`, `*`, `f1,f2,foo`) + f(`* | extract from s1 "x" | rm foo,f1,f2`, `*`, `f1,f2,foo,s1`) + + f(`* | unpack_json`, `*`, ``) + f(`* | unpack_json from s1`, `*`, ``) + f(`* | unpack_json from s1 | fields f1`, `f1,s1`, ``) + f(`* | unpack_json from s1 | fields s1,f1`, `f1,s1`, ``) + f(`* | unpack_json from s1 | rm f1`, `*`, `f1`) + f(`* | unpack_json from s1 | rm f1,s1`, `*`, `f1`) f(`* | rm f1, f2`, `*`, `f1,f2`) f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`) diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 5bd12a99d..32e7b0cea 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -67,85 +67,97 @@ func parsePipes(lex *lexer) ([]pipe, error) { if !lex.isKeyword("|") { return nil, fmt.Errorf("expecting '|'; got %q", lex.token) } - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing token after '|'") - } - switch { - case lex.isKeyword("copy", "cp"): - pc, err := parsePipeCopy(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) - } - pipes = append(pipes, pc) - case lex.isKeyword("delete", "del", "rm"): - pd, err := parsePipeDelete(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) - } - pipes = append(pipes, pd) - case lex.isKeyword("extract"): - pe, err := parsePipeExtract(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err) - } - pipes = append(pipes, pe) - case lex.isKeyword("field_names"): - pf, err := parsePipeFieldNames(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) - } - pipes = append(pipes, pf) - case lex.isKeyword("fields"): - pf, err := parsePipeFields(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) - } - pipes = append(pipes, pf) - case lex.isKeyword("filter"): - pf, err := parsePipeFilter(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) - } - pipes = append(pipes, pf) - case lex.isKeyword("limit", "head"): - pl, err := parsePipeLimit(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err) - } - pipes = append(pipes, pl) - case lex.isKeyword("offset", "skip"): - ps, err := parsePipeOffset(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) - } - pipes = append(pipes, ps) - case lex.isKeyword("rename", "mv"): - pr, err := parsePipeRename(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) - } - pipes = append(pipes, pr) - case lex.isKeyword("sort"): - ps, err := parsePipeSort(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) - } - pipes = append(pipes, ps) - case lex.isKeyword("stats"): - ps, err := parsePipeStats(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) - } - pipes = append(pipes, ps) - case lex.isKeyword("uniq"): - pu, err := parsePipeUniq(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) - } - pipes = append(pipes, pu) - default: - return nil, fmt.Errorf("unexpected pipe %q", lex.token) + lex.nextToken() + p, err := parsePipe(lex) + if err != nil { + return nil, err } + pipes = append(pipes, p) } return pipes, nil } + +func parsePipe(lex *lexer) (pipe, error) { + switch { + case lex.isKeyword("copy", "cp"): + pc, err := parsePipeCopy(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err) + } + return pc, nil + case lex.isKeyword("delete", "del", "rm"): + pd, err := parsePipeDelete(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) + } + return pd, nil + case lex.isKeyword("extract"): + pe, err := parsePipeExtract(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err) + } + return pe, nil + case lex.isKeyword("field_names"): + pf, err := parsePipeFieldNames(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err) + } + return pf, nil + case lex.isKeyword("fields"): + pf, err := parsePipeFields(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) + } + return pf, nil + case lex.isKeyword("filter"): + pf, err := parsePipeFilter(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err) + } + return pf, nil + case lex.isKeyword("limit", "head"): + pl, err := parsePipeLimit(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err) + } + return pl, nil + case lex.isKeyword("offset", "skip"): + ps, err := parsePipeOffset(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'offset' pipe: %w", err) + } + return ps, nil + case lex.isKeyword("rename", "mv"): + pr, err := parsePipeRename(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'rename' pipe: %w", err) + } + return pr, nil + case lex.isKeyword("sort"): + ps, err := parsePipeSort(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) + } + return ps, nil + case lex.isKeyword("stats"): + ps, err := parsePipeStats(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) + } + return ps, nil + case lex.isKeyword("uniq"): + pu, err := parsePipeUniq(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err) + } + return pu, nil + case lex.isKeyword("unpack_json"): + pu, err := parsePipeUnpackJSON(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'unpack_json' pipe: %w", err) + } + return pu, nil + default: + return nil, fmt.Errorf("unexpected pipe %q", lex.token) + } +} diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index 214eae5ec..256e96345 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -24,7 +24,7 @@ func (pu *pipeUnpackJSON) String() string { s += " from " + quoteTokenIfNeeded(pu.fromField) } if pu.resultPrefix != "" { - s += " prefix " + quoteTokenIfNeeded(pu.resultPrefix) + s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) } return s } @@ -80,7 +80,7 @@ func (shard *pipeUnpackJSONProcessorShard) writeRow(ppBase pipeProcessor, br *bl areEqualColumns := len(rcs) == len(cs)+len(extraFields) if areEqualColumns { for i, f := range extraFields { - if rcs[len(rcs)+i].name != f.Name { + if rcs[len(cs)+i].name != f.Name { areEqualColumns = false break } @@ -107,7 +107,7 @@ func (shard *pipeUnpackJSONProcessorShard) writeRow(ppBase pipeProcessor, br *bl } for i, f := range extraFields { v := f.Value - rcs[len(rcs)+i].addValue(v) + rcs[len(cs)+i].addValue(v) shard.valuesLen += len(v) } if shard.valuesLen >= 1_000_000 { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 8d3dc1c9a..2caf44caf 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -1,9 +1,328 @@ package logstorage import ( + "math/rand" + "slices" + "strings" + "sync" "testing" ) +func TestPipeUnpackJSON(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // single row, unpack from _msg + f("unpack_json", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"foo", "bar"}, + }, + }) + + // single row, unpack from _msg into _msg + f("unpack_json", [][]Field{ + { + {"_msg", `{"_msg":"bar"}`}, + }, + }, [][]Field{ + { + {"_msg", "bar"}, + }, + }) + + // single row, unpack from missing field + f("unpack_json from x", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + }, + }) + + // single row, unpack from non-json field + f("unpack_json from x", [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + }, + }) + + // single row, unpack from non-dict json + f("unpack_json from x", [][]Field{ + { + {"x", `["foobar"]`}, + }, + }, [][]Field{ + { + {"x", `["foobar"]`}, + }, + }) + f("unpack_json from x", [][]Field{ + { + {"x", `1234`}, + }, + }, [][]Field{ + { + {"x", `1234`}, + }, + }) + f("unpack_json from x", [][]Field{ + { + {"x", `"xxx"`}, + }, + }, [][]Field{ + { + {"x", `"xxx"`}, + }, + }) + + // single row, unpack from named field + f("unpack_json from x", [][]Field{ + { + {"x", `{"foo":"bar","baz":"xyz","a":123,"b":["foo","bar"],"x":NaN}`}, + }, + }, [][]Field{ + { + {"x", `NaN`}, + {"foo", "bar"}, + {"baz", "xyz"}, + {"a", "123"}, + {"b", `["foo","bar"]`}, + }, + }) + + // multiple rows with distinct number of fields + f("unpack_json from x", [][]Field{ + { + {"x", `{"foo":"bar","baz":"xyz"}`}, + {"y", `abc`}, + }, + { + {"y", `abc`}, + }, + { + {"z", `foobar`}, + {"x", `{"z":["bar",123]}`}, + }, + }, [][]Field{ + { + {"x", `{"foo":"bar","baz":"xyz"}`}, + {"y", "abc"}, + {"foo", "bar"}, + {"baz", "xyz"}, + }, + { + {"y", `abc`}, + }, + { + {"z", `["bar",123]`}, + {"x", `{"z":["bar",123]}`}, + }, + }) + +} + +func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err) + } + + workersCount := 5 + stopCh := make(chan struct{}) + cancel := func() {} + ppTest := newTestPipeProcessor() + pp := p.newPipeProcessor(workersCount, stopCh, cancel, ppTest) + + brw := newTestBlockResultWriter(workersCount, pp) + for _, row := range rows { + brw.writeRow(row) + } + brw.flush() + + ppTest.expectRows(t, rowsExpected) +} + +func newTestBlockResultWriter(workersCount int, ppBase pipeProcessor) *testBlockResultWriter { + return &testBlockResultWriter{ + workersCount: workersCount, + ppBase: ppBase, + } +} + +type testBlockResultWriter struct { + workersCount int + ppBase pipeProcessor + rcs []resultColumn + br blockResult +} + +func (brw *testBlockResultWriter) writeRow(row []Field) { + if !brw.areSameFields(row) { + brw.flush() + + brw.rcs = brw.rcs[:0] + for _, field := range row { + brw.rcs = appendResultColumnWithName(brw.rcs, field.Name) + } + } + + for i, field := range row { + brw.rcs[i].addValue(field.Value) + } + if rand.Intn(5) == 0 { + brw.flush() + } +} + +func (brw *testBlockResultWriter) areSameFields(row []Field) bool { + if len(brw.rcs) != len(row) { + return false + } + for i, rc := range brw.rcs { + if rc.name != row[i].Name { + return false + } + } + return true +} + +func (brw *testBlockResultWriter) flush() { + brw.br.setResultColumns(brw.rcs) + workerID := rand.Intn(brw.workersCount) + brw.ppBase.writeBlock(uint(workerID), &brw.br) + brw.br.reset() + for i := range brw.rcs { + brw.rcs[i].resetValues() + } +} + +func newTestPipeProcessor() *testPipeProcessor { + return &testPipeProcessor{} +} + +type testPipeProcessor struct { + resultRowsLock sync.Mutex + resultRows [][]Field +} + +func (pp *testPipeProcessor) writeBlock(workerID uint, br *blockResult) { + cs := br.getColumns() + var columnValues [][]string + for _, c := range cs { + values := c.getValues(br) + columnValues = append(columnValues, values) + } + + for i := range br.timestamps { + row := make([]Field, len(columnValues)) + for j, values := range columnValues { + r := &row[j] + r.Name = strings.Clone(cs[j].name) + r.Value = strings.Clone(values[i]) + } + pp.resultRowsLock.Lock() + pp.resultRows = append(pp.resultRows, row) + pp.resultRowsLock.Unlock() + } +} + +func (pp *testPipeProcessor) flush() error { + return nil +} + +func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) { + t.Helper() + + if len(pp.resultRows) != len(expectedRows) { + t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s", + len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows)) + } + + sortTestRows(pp.resultRows) + sortTestRows(expectedRows) + + for i, resultRow := range pp.resultRows { + expectedRow := expectedRows[i] + if len(resultRow) != len(expectedRow) { + t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s", + i, len(resultRow), len(expectedRow), rowToString(resultRow), rowToString(expectedRow)) + } + for j, resultField := range resultRow { + expectedField := expectedRow[j] + if resultField.Name != expectedField.Name { + t.Fatalf("unexpected field name at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", + i, resultField.Name, expectedField.Name, rowToString(resultRow), rowToString(expectedRow)) + } + if resultField.Value != expectedField.Value { + t.Fatalf("unexpected value for field %q at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", + resultField.Name, i, resultField.Value, expectedField.Value, rowToString(resultRow), rowToString(expectedRow)) + } + } + } +} + +func sortTestRows(rows [][]Field) { + slices.SortFunc(rows, func(a, b []Field) int { + reverse := -1 + if len(a) > len(b) { + reverse = 1 + a, b = b, a + } + for i, fA := range a { + fB := b[i] + if fA.Name == fB.Name { + if fA.Value == fB.Value { + continue + } + if fA.Value < fB.Value { + return reverse + } + return -reverse + } + if fA.Name < fB.Name { + return reverse + } + return -reverse + } + if len(a) == len(b) { + return 0 + } + return reverse + }) +} + +func rowsToString(rows [][]Field) string { + a := make([]string, len(rows)) + for i, row := range rows { + a[i] = rowToString(row) + } + return strings.Join(a, "\n") +} + +func rowToString(row []Field) string { + a := make([]string, len(row)) + for i, f := range row { + a[i] = f.String() + } + return "{" + strings.Join(a, ",") + "}" +} + func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper()