diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 552cffd47..17d78a0ed 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -3,6 +3,7 @@ package logstorage import ( "math" "slices" + "strings" "sync/atomic" "time" "unsafe" @@ -28,12 +29,6 @@ type blockResult struct { // timestamps contain timestamps for the selected log entries in the block. timestamps []int64 - // csBufOffset contains csBuf offset for the requested columns. - // - // columns with indexes below csBufOffset are ignored. - // This is needed for simplifying data transformations at pipe stages. - csBufOffset int - // csBuf contains requested columns. csBuf []blockResultColumn @@ -52,8 +47,6 @@ func (br *blockResult) reset() { br.timestamps = br.timestamps[:0] - br.csBufOffset = 0 - clear(br.csBuf) br.csBuf = br.csBuf[:0] @@ -1227,56 +1220,67 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { // copyColumns copies columns from srcColumnNames to dstColumnNames. func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) { - if len(srcColumnNames) == 0 { - return + for i, srcName := range srcColumnNames { + br.copySingleColumn(srcName, dstColumnNames[i]) } +} - csBuf := br.csBuf - csBufOffset := len(csBuf) - for _, c := range br.getColumns() { - if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { - c.name = dstColumnNames[idx] - csBuf = append(csBuf, *c) - // continue is skipped intentionally in order to leave the original column in the columns list. +func (br *blockResult) copySingleColumn(srcName, dstName string) { + found := false + cs := br.getColumns() + csBufLen := len(br.csBuf) + for _, c := range cs { + if c.name != dstName { + br.csBuf = append(br.csBuf, *c) } - if !slices.Contains(dstColumnNames, c.name) { - csBuf = append(csBuf, *c) + if c.name == srcName { + cCopy := *c + cCopy.name = dstName + br.csBuf = append(br.csBuf, cCopy) + found = true } } - br.csBufOffset = csBufOffset - br.csBuf = csBuf + if !found { + br.addConstColumn(dstName, "") + } + br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...) br.csInitialized = false - - for _, dstColumnName := range dstColumnNames { - br.createMissingColumnByName(dstColumnName) - } } // renameColumns renames columns from srcColumnNames to dstColumnNames. func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) { - if len(srcColumnNames) == 0 { - return + for i, srcName := range srcColumnNames { + br.renameSingleColumn(srcName, dstColumnNames[i]) } +} - csBuf := br.csBuf - csBufOffset := len(csBuf) - for _, c := range br.getColumns() { - if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { - c.name = dstColumnNames[idx] - csBuf = append(csBuf, *c) - continue - } - if !slices.Contains(dstColumnNames, c.name) { - csBuf = append(csBuf, *c) +func (br *blockResult) renameSingleColumn(srcName, dstName string) { + found := false + cs := br.getColumns() + csBufLen := len(br.csBuf) + for _, c := range cs { + if c.name == srcName { + cCopy := *c + cCopy.name = dstName + br.csBuf = append(br.csBuf, cCopy) + found = true + } else if c.name != dstName { + br.csBuf = append(br.csBuf, *c) } } - br.csBufOffset = csBufOffset - br.csBuf = csBuf + if !found { + br.addConstColumn(dstName, "") + } + br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...) br.csInitialized = false +} - for _, dstColumnName := range dstColumnNames { - br.createMissingColumnByName(dstColumnName) +func debugColumnNames(cs []*blockResultColumn) string { + a := make([]string, len(cs)) + for i, c := range cs { + a[i] = c.name } + return strings.Join(a, ",") } // deleteColumns deletes columns with the given columnNames. @@ -1285,15 +1289,15 @@ func (br *blockResult) deleteColumns(columnNames []string) { return } - csBuf := br.csBuf - csBufOffset := len(csBuf) - for _, c := range br.getColumns() { + cs := br.getColumns() + csBufLen := len(br.csBuf) + for _, c := range cs { if !slices.Contains(columnNames, c.name) { - csBuf = append(csBuf, *c) + br.csBuf = append(br.csBuf, *c) } } - br.csBufOffset = csBufOffset - br.csBuf = csBuf + + br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...) br.csInitialized = false } @@ -1305,14 +1309,21 @@ func (br *blockResult) setColumns(columnNames []string) { } // Slow path - construct the requested columns - csBuf := br.csBuf - csBufOffset := len(csBuf) - for _, columnName := range columnNames { - c := br.getColumnByName(columnName) - csBuf = append(csBuf, *c) + cs := br.getColumns() + csBufLen := len(br.csBuf) + for _, c := range cs { + if slices.Contains(columnNames, c.name) { + br.csBuf = append(br.csBuf, *c) + } } - br.csBufOffset = csBufOffset - br.csBuf = csBuf + + for _, columnName := range columnNames { + if idx := getBlockResultColumnIdxByName(cs, columnName); idx < 0 { + br.addConstColumn(columnName, "") + } + } + + br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...) br.csInitialized = false } @@ -1344,22 +1355,12 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn { return &br.csBuf[len(br.csBuf)-1] } -func (br *blockResult) createMissingColumnByName(columnName string) { - for _, c := range br.getColumns() { - if c.name == columnName { - return - } - } - - br.addConstColumn(columnName, "") -} - func (br *blockResult) getColumns() []*blockResultColumn { if br.csInitialized { return br.cs } - csBuf := br.csBuf[br.csBufOffset:] + csBuf := br.csBuf clear(br.cs) cs := br.cs[:0] for i := range csBuf { diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index d512bb0d5..55882d36a 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -32,29 +32,21 @@ func (pc *pipeCopy) String() string { } func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) { - neededSrcFields := make([]bool, len(pc.srcFields)) - for i, dstField := range pc.dstFields { - if neededFields.contains(dstField) && !unneededFields.contains(dstField) { - neededSrcFields[i] = true - } - } - if neededFields.contains("*") { - // update only unneeded fields - unneededFields.addFields(pc.dstFields) - for i, srcField := range pc.srcFields { - if neededSrcFields[i] { + for i := len(pc.srcFields)-1; i >=0 ; i-- { + srcField := pc.srcFields[i] + dstField := pc.dstFields[i] + + if neededFields.contains("*") { + if !unneededFields.contains(dstField) { + unneededFields.add(dstField) unneededFields.remove(srcField) } - } - } else { - // update only needed fields and reset unneeded fields - neededFields.removeFields(pc.dstFields) - for i, srcField := range pc.srcFields { - if neededSrcFields[i] { + } else { + if neededFields.contains(dstField) { + neededFields.remove(dstField) neededFields.add(srcField) } } - unneededFields.reset() } } diff --git a/lib/logstorage/pipe_copy_test.go b/lib/logstorage/pipe_copy_test.go index c9e30ae22..327825ea4 100644 --- a/lib/logstorage/pipe_copy_test.go +++ b/lib/logstorage/pipe_copy_test.go @@ -5,6 +5,186 @@ import ( "testing" ) +func TestParsePipeCopySuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`copy foo as bar`) + f(`copy foo as bar, a as b`) +} + +func TestParsePipeCopyFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`copy`) + f(`copy x`) + f(`copy x as`) + f(`copy x y z`) +} + +func TestPipeCopy(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // single row, copy from existing field + f("copy a as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", `test`}, + }, + }) + + // single row, copy from existing field to multiple fields + f("copy a as b, a as c, _msg as d", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", `test`}, + {"c", `test`}, + {"d", `{"foo":"bar"}`}, + }, + }) + + // single row, copy from non-exsiting field + f("copy x as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", ``}, + }, + }) + + // copy to existing field + f("copy _msg as a", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `{"foo":"bar"}`}, + }, + }) + + // copy to itself + f("copy a as a", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }) + + // swap copy + f("copy a as b, _msg as a, b as _msg", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `test`}, + {"a", `{"foo":"bar"}`}, + {"b", `test`}, + }, + }) + + // copy to the same field multiple times + f("copy a as b, _msg as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", `{"foo":"bar"}`}, + }, + }) + + // chain copy + f("copy a as b, b as c", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", `test`}, + {"c", `test`}, + }, + }) + + // Multiple rows + f("copy a as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + { + {"a", `foobar`}, + }, + { + {"b", `baz`}, + {"c", "d"}, + {"e", "afdf"}, + }, + { + {"c", "dss"}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", `test`}, + }, + { + {"a", `foobar`}, + {"b", `foobar`}, + }, + { + {"b", ``}, + {"c", "d"}, + {"e", "afdf"}, + }, + { + {"c", "dss"}, + {"b", ""}, + }, + }) +} + func TestPipeCopyUpdateNeededFields(t *testing.T) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() @@ -13,6 +193,7 @@ func TestPipeCopyUpdateNeededFields(t *testing.T) { // all the needed fields f("copy s1 d1, s2 d2", "*", "", "*", "d1,d2") + f("copy a a", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src and dst f("copy s1 d1 ,s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index 99a774ad4..9d3427e1c 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -32,33 +32,25 @@ func (pr *pipeRename) String() string { } func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) { - neededSrcFields := make([]bool, len(pr.srcFields)) - for i, dstField := range pr.dstFields { - if neededFields.contains(dstField) && !unneededFields.contains(dstField) { - neededSrcFields[i] = true - } - } - if neededFields.contains("*") { - // update only unneeded fields - unneededFields.addFields(pr.dstFields) - for i, srcField := range pr.srcFields { - if neededSrcFields[i] { - unneededFields.remove(srcField) - } else { + for i := len(pr.srcFields)-1; i >=0 ; i-- { + srcField := pr.srcFields[i] + dstField := pr.dstFields[i] + + if neededFields.contains("*") { + if unneededFields.contains(dstField) { unneededFields.add(srcField) + } else { + unneededFields.add(dstField) + unneededFields.remove(srcField) } - } - } else { - // update only needed fields and reset unneeded fields - neededFields.removeFields(pr.dstFields) - for i, srcField := range pr.srcFields { - if neededSrcFields[i] { + } else { + if neededFields.contains(dstField) { + neededFields.remove(dstField) neededFields.add(srcField) } else { neededFields.remove(srcField) } } - unneededFields.reset() } } diff --git a/lib/logstorage/pipe_rename_test.go b/lib/logstorage/pipe_rename_test.go index 506a2f5fa..4b44e5ee7 100644 --- a/lib/logstorage/pipe_rename_test.go +++ b/lib/logstorage/pipe_rename_test.go @@ -4,6 +4,175 @@ import ( "testing" ) +func TestParsePipeRenameSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`rename foo as bar`) + f(`rename foo as bar, a as b`) +} + +func TestParsePipeRenameFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`rename`) + f(`rename x`) + f(`rename x as`) + f(`rename x y z`) +} + +func TestPipeRename(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // single row, rename from existing field + f("rename a as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"b", `test`}, + }, + }) + + // single row, rename from existing field to multiple fields + f("rename a as b, a as c, _msg as d", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"b", `test`}, + {"c", ``}, + {"d", `{"foo":"bar"}`}, + }, + }) + + // single row, rename from non-exsiting field + f("rename x as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + {"b", ``}, + }, + }) + + // rename to existing field + f("rename _msg as a", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"a", `{"foo":"bar"}`}, + }, + }) + + // rename to itself + f("rename a as a", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }) + + // swap rename + f("rename a as b, _msg as a, b as _msg", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `test`}, + {"a", `{"foo":"bar"}`}, + }, + }) + + // rename to the same field multiple times + f("rename a as b, _msg as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"b", `{"foo":"bar"}`}, + }, + }) + + // chain rename (shouldn't work - otherwise swap rename will break) + f("rename a as b, b as c", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"c", `test`}, + }, + }) + + // Multiple rows + f("rename a as b", [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"a", `test`}, + }, + { + {"a", `foobar`}, + }, + { + {"b", `baz`}, + {"c", "d"}, + {"e", "afdf"}, + }, + { + {"c", "dss"}, + }, + }, [][]Field{ + { + {"_msg", `{"foo":"bar"}`}, + {"b", `test`}, + }, + { + {"b", `foobar`}, + }, + { + {"b", ``}, + {"c", "d"}, + {"e", "afdf"}, + }, + { + {"c", "dss"}, + {"b", ""}, + }, + }) +} + func TestPipeRenameUpdateNeededFields(t *testing.T) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() @@ -12,6 +181,7 @@ func TestPipeRenameUpdateNeededFields(t *testing.T) { // all the needed fields f("rename s1 d1, s2 d2", "*", "", "*", "d1,d2") + f("rename a a", "*", "", "*", "") // all the needed fields, unneeded fields do not intersect with src and dst f("rename s1 d1, s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index abaa2c811..a889a1fde 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -341,35 +341,56 @@ func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) { } func sortTestRows(rows [][]Field) { + for _, row := range rows { + sortTestFields(row) + } slices.SortFunc(rows, func(a, b []Field) int { - reverse := -1 + reverse := false if len(a) > len(b) { - reverse = 1 + reverse = true a, b = b, a } for i, fA := range a { fB := b[i] - if fA.Name == fB.Name { - if fA.Value == fB.Value { - continue - } - if fA.Value < fB.Value { - return reverse - } - return -reverse + result := cmpTestFields(fA, fB) + if result == 0 { + continue } - if fA.Name < fB.Name { - return reverse + if reverse { + result = -result } - return -reverse + return result } if len(a) == len(b) { return 0 } - return reverse + if reverse { + return 1 + } + return -1 }) } +func sortTestFields(fields []Field) { + slices.SortFunc(fields, cmpTestFields) +} + +func cmpTestFields(a, b Field) int { + if a.Name == b.Name { + if a.Value == b.Value { + return 0 + } + if a.Value < b.Value { + return -1 + } + return 1 + } + if a.Name < b.Name { + return -1 + } + return 1 +} + func rowsToString(rows [][]Field) string { a := make([]string, len(rows)) for i, row := range rows {