This commit is contained in:
Aliaksandr Valialkin 2024-05-20 21:34:24 +02:00
parent 01f63b9e94
commit ae4f92f4cd
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 474 additions and 117 deletions

View file

@ -3,6 +3,7 @@ package logstorage
import ( import (
"math" "math"
"slices" "slices"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -28,12 +29,6 @@ type blockResult struct {
// timestamps contain timestamps for the selected log entries in the block. // timestamps contain timestamps for the selected log entries in the block.
timestamps []int64 timestamps []int64
// csBufOffset contains csBuf offset for the requested columns.
//
// columns with indexes below csBufOffset are ignored.
// This is needed for simplifying data transformations at pipe stages.
csBufOffset int
// csBuf contains requested columns. // csBuf contains requested columns.
csBuf []blockResultColumn csBuf []blockResultColumn
@ -52,8 +47,6 @@ func (br *blockResult) reset() {
br.timestamps = br.timestamps[:0] br.timestamps = br.timestamps[:0]
br.csBufOffset = 0
clear(br.csBuf) clear(br.csBuf)
br.csBuf = br.csBuf[:0] br.csBuf = br.csBuf[:0]
@ -1227,56 +1220,67 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
// copyColumns copies columns from srcColumnNames to dstColumnNames. // copyColumns copies columns from srcColumnNames to dstColumnNames.
func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) { func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) {
if len(srcColumnNames) == 0 { for i, srcName := range srcColumnNames {
return br.copySingleColumn(srcName, dstColumnNames[i])
}
} }
csBuf := br.csBuf func (br *blockResult) copySingleColumn(srcName, dstName string) {
csBufOffset := len(csBuf) found := false
for _, c := range br.getColumns() { cs := br.getColumns()
if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { csBufLen := len(br.csBuf)
c.name = dstColumnNames[idx] for _, c := range cs {
csBuf = append(csBuf, *c) if c.name != dstName {
// continue is skipped intentionally in order to leave the original column in the columns list. br.csBuf = append(br.csBuf, *c)
} }
if !slices.Contains(dstColumnNames, c.name) { if c.name == srcName {
csBuf = append(csBuf, *c) cCopy := *c
cCopy.name = dstName
br.csBuf = append(br.csBuf, cCopy)
found = true
} }
} }
br.csBufOffset = csBufOffset if !found {
br.csBuf = csBuf br.addConstColumn(dstName, "")
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false br.csInitialized = false
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
}
} }
// renameColumns renames columns from srcColumnNames to dstColumnNames. // renameColumns renames columns from srcColumnNames to dstColumnNames.
func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) { func (br *blockResult) renameColumns(srcColumnNames, dstColumnNames []string) {
if len(srcColumnNames) == 0 { for i, srcName := range srcColumnNames {
return br.renameSingleColumn(srcName, dstColumnNames[i])
}
} }
csBuf := br.csBuf func (br *blockResult) renameSingleColumn(srcName, dstName string) {
csBufOffset := len(csBuf) found := false
for _, c := range br.getColumns() { cs := br.getColumns()
if idx := slices.Index(srcColumnNames, c.name); idx >= 0 { csBufLen := len(br.csBuf)
c.name = dstColumnNames[idx] for _, c := range cs {
csBuf = append(csBuf, *c) if c.name == srcName {
continue cCopy := *c
} cCopy.name = dstName
if !slices.Contains(dstColumnNames, c.name) { br.csBuf = append(br.csBuf, cCopy)
csBuf = append(csBuf, *c) found = true
} else if c.name != dstName {
br.csBuf = append(br.csBuf, *c)
} }
} }
br.csBufOffset = csBufOffset if !found {
br.csBuf = csBuf br.addConstColumn(dstName, "")
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false br.csInitialized = false
for _, dstColumnName := range dstColumnNames {
br.createMissingColumnByName(dstColumnName)
} }
func debugColumnNames(cs []*blockResultColumn) string {
a := make([]string, len(cs))
for i, c := range cs {
a[i] = c.name
}
return strings.Join(a, ",")
} }
// deleteColumns deletes columns with the given columnNames. // deleteColumns deletes columns with the given columnNames.
@ -1285,15 +1289,15 @@ func (br *blockResult) deleteColumns(columnNames []string) {
return return
} }
csBuf := br.csBuf cs := br.getColumns()
csBufOffset := len(csBuf) csBufLen := len(br.csBuf)
for _, c := range br.getColumns() { for _, c := range cs {
if !slices.Contains(columnNames, c.name) { if !slices.Contains(columnNames, c.name) {
csBuf = append(csBuf, *c) br.csBuf = append(br.csBuf, *c)
} }
} }
br.csBufOffset = csBufOffset
br.csBuf = csBuf br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false br.csInitialized = false
} }
@ -1305,14 +1309,21 @@ func (br *blockResult) setColumns(columnNames []string) {
} }
// Slow path - construct the requested columns // Slow path - construct the requested columns
csBuf := br.csBuf cs := br.getColumns()
csBufOffset := len(csBuf) csBufLen := len(br.csBuf)
for _, columnName := range columnNames { for _, c := range cs {
c := br.getColumnByName(columnName) if slices.Contains(columnNames, c.name) {
csBuf = append(csBuf, *c) br.csBuf = append(br.csBuf, *c)
} }
br.csBufOffset = csBufOffset }
br.csBuf = csBuf
for _, columnName := range columnNames {
if idx := getBlockResultColumnIdxByName(cs, columnName); idx < 0 {
br.addConstColumn(columnName, "")
}
}
br.csBuf = append(br.csBuf[:0], br.csBuf[csBufLen:]...)
br.csInitialized = false br.csInitialized = false
} }
@ -1344,22 +1355,12 @@ func (br *blockResult) getColumnByName(columnName string) *blockResultColumn {
return &br.csBuf[len(br.csBuf)-1] return &br.csBuf[len(br.csBuf)-1]
} }
func (br *blockResult) createMissingColumnByName(columnName string) {
for _, c := range br.getColumns() {
if c.name == columnName {
return
}
}
br.addConstColumn(columnName, "")
}
func (br *blockResult) getColumns() []*blockResultColumn { func (br *blockResult) getColumns() []*blockResultColumn {
if br.csInitialized { if br.csInitialized {
return br.cs return br.cs
} }
csBuf := br.csBuf[br.csBufOffset:] csBuf := br.csBuf
clear(br.cs) clear(br.cs)
cs := br.cs[:0] cs := br.cs[:0]
for i := range csBuf { for i := range csBuf {

View file

@ -32,29 +32,21 @@ func (pc *pipeCopy) String() string {
} }
func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) { func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededSrcFields := make([]bool, len(pc.srcFields)) for i := len(pc.srcFields)-1; i >=0 ; i-- {
for i, dstField := range pc.dstFields { srcField := pc.srcFields[i]
if neededFields.contains(dstField) && !unneededFields.contains(dstField) { dstField := pc.dstFields[i]
neededSrcFields[i] = true
}
}
if neededFields.contains("*") { if neededFields.contains("*") {
// update only unneeded fields if !unneededFields.contains(dstField) {
unneededFields.addFields(pc.dstFields) unneededFields.add(dstField)
for i, srcField := range pc.srcFields {
if neededSrcFields[i] {
unneededFields.remove(srcField) unneededFields.remove(srcField)
} }
}
} else { } else {
// update only needed fields and reset unneeded fields if neededFields.contains(dstField) {
neededFields.removeFields(pc.dstFields) neededFields.remove(dstField)
for i, srcField := range pc.srcFields {
if neededSrcFields[i] {
neededFields.add(srcField) neededFields.add(srcField)
} }
} }
unneededFields.reset()
} }
} }

View file

@ -5,6 +5,186 @@ import (
"testing" "testing"
) )
func TestParsePipeCopySuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`copy foo as bar`)
f(`copy foo as bar, a as b`)
}
func TestParsePipeCopyFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`copy`)
f(`copy x`)
f(`copy x as`)
f(`copy x y z`)
}
func TestPipeCopy(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// single row, copy from existing field
f("copy a as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", `test`},
},
})
// single row, copy from existing field to multiple fields
f("copy a as b, a as c, _msg as d", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", `test`},
{"c", `test`},
{"d", `{"foo":"bar"}`},
},
})
// single row, copy from non-exsiting field
f("copy x as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", ``},
},
})
// copy to existing field
f("copy _msg as a", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `{"foo":"bar"}`},
},
})
// copy to itself
f("copy a as a", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
})
// swap copy
f("copy a as b, _msg as a, b as _msg", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `test`},
{"a", `{"foo":"bar"}`},
{"b", `test`},
},
})
// copy to the same field multiple times
f("copy a as b, _msg as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", `{"foo":"bar"}`},
},
})
// chain copy
f("copy a as b, b as c", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", `test`},
{"c", `test`},
},
})
// Multiple rows
f("copy a as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
{
{"a", `foobar`},
},
{
{"b", `baz`},
{"c", "d"},
{"e", "afdf"},
},
{
{"c", "dss"},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", `test`},
},
{
{"a", `foobar`},
{"b", `foobar`},
},
{
{"b", ``},
{"c", "d"},
{"e", "afdf"},
},
{
{"c", "dss"},
{"b", ""},
},
})
}
func TestPipeCopyUpdateNeededFields(t *testing.T) { func TestPipeCopyUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper() t.Helper()
@ -13,6 +193,7 @@ func TestPipeCopyUpdateNeededFields(t *testing.T) {
// all the needed fields // all the needed fields
f("copy s1 d1, s2 d2", "*", "", "*", "d1,d2") f("copy s1 d1, s2 d2", "*", "", "*", "d1,d2")
f("copy a a", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src and dst // all the needed fields, unneeded fields do not intersect with src and dst
f("copy s1 d1 ,s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") f("copy s1 d1 ,s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2")

View file

@ -32,33 +32,25 @@ func (pr *pipeRename) String() string {
} }
func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) { func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededSrcFields := make([]bool, len(pr.srcFields)) for i := len(pr.srcFields)-1; i >=0 ; i-- {
for i, dstField := range pr.dstFields { srcField := pr.srcFields[i]
if neededFields.contains(dstField) && !unneededFields.contains(dstField) { dstField := pr.dstFields[i]
neededSrcFields[i] = true
}
}
if neededFields.contains("*") { if neededFields.contains("*") {
// update only unneeded fields if unneededFields.contains(dstField) {
unneededFields.addFields(pr.dstFields)
for i, srcField := range pr.srcFields {
if neededSrcFields[i] {
unneededFields.remove(srcField)
} else {
unneededFields.add(srcField) unneededFields.add(srcField)
} } else {
unneededFields.add(dstField)
unneededFields.remove(srcField)
} }
} else { } else {
// update only needed fields and reset unneeded fields if neededFields.contains(dstField) {
neededFields.removeFields(pr.dstFields) neededFields.remove(dstField)
for i, srcField := range pr.srcFields {
if neededSrcFields[i] {
neededFields.add(srcField) neededFields.add(srcField)
} else { } else {
neededFields.remove(srcField) neededFields.remove(srcField)
} }
} }
unneededFields.reset()
} }
} }

View file

@ -4,6 +4,175 @@ import (
"testing" "testing"
) )
func TestParsePipeRenameSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`rename foo as bar`)
f(`rename foo as bar, a as b`)
}
func TestParsePipeRenameFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`rename`)
f(`rename x`)
f(`rename x as`)
f(`rename x y z`)
}
func TestPipeRename(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// single row, rename from existing field
f("rename a as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"b", `test`},
},
})
// single row, rename from existing field to multiple fields
f("rename a as b, a as c, _msg as d", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"b", `test`},
{"c", ``},
{"d", `{"foo":"bar"}`},
},
})
// single row, rename from non-exsiting field
f("rename x as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", ``},
},
})
// rename to existing field
f("rename _msg as a", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"a", `{"foo":"bar"}`},
},
})
// rename to itself
f("rename a as a", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
})
// swap rename
f("rename a as b, _msg as a, b as _msg", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `test`},
{"a", `{"foo":"bar"}`},
},
})
// rename to the same field multiple times
f("rename a as b, _msg as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"b", `{"foo":"bar"}`},
},
})
// chain rename (shouldn't work - otherwise swap rename will break)
f("rename a as b, b as c", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"c", `test`},
},
})
// Multiple rows
f("rename a as b", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
{
{"a", `foobar`},
},
{
{"b", `baz`},
{"c", "d"},
{"e", "afdf"},
},
{
{"c", "dss"},
},
}, [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"b", `test`},
},
{
{"b", `foobar`},
},
{
{"b", ``},
{"c", "d"},
{"e", "afdf"},
},
{
{"c", "dss"},
{"b", ""},
},
})
}
func TestPipeRenameUpdateNeededFields(t *testing.T) { func TestPipeRenameUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper() t.Helper()
@ -12,6 +181,7 @@ func TestPipeRenameUpdateNeededFields(t *testing.T) {
// all the needed fields // all the needed fields
f("rename s1 d1, s2 d2", "*", "", "*", "d1,d2") f("rename s1 d1, s2 d2", "*", "", "*", "d1,d2")
f("rename a a", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src and dst // all the needed fields, unneeded fields do not intersect with src and dst
f("rename s1 d1, s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2") f("rename s1 d1, s2 d2", "*", "f1,f2", "*", "d1,d2,f1,f2")

View file

@ -341,35 +341,56 @@ func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
} }
func sortTestRows(rows [][]Field) { func sortTestRows(rows [][]Field) {
for _, row := range rows {
sortTestFields(row)
}
slices.SortFunc(rows, func(a, b []Field) int { slices.SortFunc(rows, func(a, b []Field) int {
reverse := -1 reverse := false
if len(a) > len(b) { if len(a) > len(b) {
reverse = 1 reverse = true
a, b = b, a a, b = b, a
} }
for i, fA := range a { for i, fA := range a {
fB := b[i] fB := b[i]
if fA.Name == fB.Name { result := cmpTestFields(fA, fB)
if fA.Value == fB.Value { if result == 0 {
continue continue
} }
if fA.Value < fB.Value { if reverse {
return reverse result = -result
} }
return -reverse return result
}
if fA.Name < fB.Name {
return reverse
}
return -reverse
} }
if len(a) == len(b) { if len(a) == len(b) {
return 0 return 0
} }
return reverse if reverse {
return 1
}
return -1
}) })
} }
func sortTestFields(fields []Field) {
slices.SortFunc(fields, cmpTestFields)
}
func cmpTestFields(a, b Field) int {
if a.Name == b.Name {
if a.Value == b.Value {
return 0
}
if a.Value < b.Value {
return -1
}
return 1
}
if a.Name < b.Name {
return -1
}
return 1
}
func rowsToString(rows [][]Field) string { func rowsToString(rows [][]Field) string {
a := make([]string, len(rows)) a := make([]string, len(rows))
for i, row := range rows { for i, row := range rows {