This commit is contained in:
Aliaksandr Valialkin 2024-05-20 23:23:22 +02:00
parent eac0722068
commit c5734e18b9
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
9 changed files with 164 additions and 34 deletions

View file

@ -3,7 +3,6 @@ package logstorage
import (
"math"
"slices"
"strings"
"sync/atomic"
"time"
"unsafe"
@ -201,19 +200,6 @@ func (br *blockResult) sizeBytes() int {
return n
}
// addResultColumns adds the given rcs to br.
//
// The br is valid only until rcs are modified.
func (br *blockResult) addResultColumns(rcs []resultColumn) {
if len(rcs) == 0 || len(rcs[0].values) == 0 {
return
}
for i := range rcs {
br.addResultColumn(&rcs[i])
}
}
// setResultColumns sets the given rcs as br columns.
//
// The br is valid only until rcs are modified.
@ -1275,14 +1261,6 @@ func (br *blockResult) renameSingleColumn(srcName, dstName string) {
br.csInitialized = false
}
func debugColumnNames(cs []*blockResultColumn) string {
a := make([]string, len(cs))
for i, c := range cs {
a[i] = c.name
}
return strings.Join(a, ",")
}
// deleteColumns deletes columns with the given columnNames.
func (br *blockResult) deleteColumns(columnNames []string) {
if len(columnNames) == 0 {
@ -1811,6 +1789,11 @@ type resultColumn struct {
values []string
}
func (rc *resultColumn) reset() {
rc.name = ""
rc.resetValues()
}
func (rc *resultColumn) resetValues() {
clear(rc.values)
rc.values = rc.values[:0]
@ -1819,8 +1802,8 @@ func (rc *resultColumn) resetValues() {
func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn {
dst = slicesutil.SetLength(dst, len(dst)+1)
rc := &dst[len(dst)-1]
rc.resetValues()
rc.name = name
rc.resetValues()
return dst
}

View file

@ -60,7 +60,9 @@ func TestPipeDelete(t *testing.T) {
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{})
}, [][]Field{
{},
})
// delete non-existing fields
f("delete foo, _msg, bar", [][]Field{
@ -93,6 +95,8 @@ func TestPipeDelete(t *testing.T) {
{"b", "df"},
},
}, [][]Field{
{},
{},
{
{"b", `baz`},
{"c", "d"},

View file

@ -68,10 +68,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeExtractProcessorShard, workersCount)
for i := range shards {
ef := newPattern(pe.steps)
shards[i] = pipeExtractProcessorShard{
pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{
ef: ef,
ef: newPattern(pe.steps),
},
}
}

View file

@ -516,7 +516,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
// send the current block to ppBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]

View file

@ -154,6 +154,28 @@ func TestPipeSort(t *testing.T) {
},
})
// Sort by multiple fields with limit desc
f("sort by (a, b) desc limit 1", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"a", `2`},
{"b", `54`},
},
})
// Sort by multiple fields with offset
f("sort by (a, b) offset 1", [][]Field{
{
@ -203,6 +225,28 @@ func TestPipeSort(t *testing.T) {
{"b", `3`},
},
})
// Sort by multiple fields with offset and limit
f("sort by (a, b) desc offset 2 limit 100", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"_msg", `def`},
{"a", `1`},
},
})
}
func TestPipeSortUpdateNeededFields(t *testing.T) {

View file

@ -4,6 +4,85 @@ import (
"testing"
)
func TestParsePipeStatsSuccess(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeSuccess(t, pipeStr)
}
f(`stats count(*) as rows`)
f(`stats by (x) count(*) as rows, count_uniq(x) as uniqs`)
f(`stats by (_time:month offset 6.5h, y) count(*) as rows, count_uniq(x) as uniqs`)
f(`stats by (_time:month offset 6.5h, y) count(*) if (q:w) as rows, count_uniq(x) as uniqs`)
}
func TestParsePipeStatsFailure(t *testing.T) {
f := func(pipeStr string) {
t.Helper()
expectParsePipeFailure(t, pipeStr)
}
f(`stats`)
f(`stats by`)
f(`stats foo`)
f(`stats count`)
f(`stats if (x:y)`)
f(`stats by(x) foo`)
f(`stats by(x:abc) count() rows`)
f(`stats by(x:1h offset) count () rows`)
f(`stats by(x:1h offset foo) count() rows`)
}
func TestPipeStats(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stats count(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "3"},
},
})
f("stats by (a) count(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"a", "1"},
{"rows", "1"},
{"a", "2"},
{"rows", "2"},
},
})
}
func TestPipeStatsUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()

View file

@ -457,7 +457,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
// send the current block to ppBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]

View file

@ -354,7 +354,7 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) {
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
// send the current block to ppBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]

View file

@ -22,10 +22,15 @@ func (uctx *fieldsUnpackerContext) resetFields() {
}
func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy := bytesutil.ToUnsafeString(nameBuf)
nameCopy := ""
if fieldPrefix != "" {
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
copy(nameBuf, fieldPrefix)
copy(nameBuf[len(fieldPrefix):], name)
nameCopy = bytesutil.ToUnsafeString(nameBuf)
} else {
nameCopy = uctx.a.copyString(name)
}
valueCopy := uctx.a.copyString(value)
@ -115,7 +120,23 @@ type pipeUnpackWriteContext struct {
valuesLen int
}
func (wctx *pipeUnpackWriteContext) reset() {
wctx.brSrc = nil
wctx.csSrc = nil
wctx.ppBase = nil
rcs := wctx.rcs
for i := range rcs {
rcs[i].reset()
}
wctx.rcs = rcs[:0]
wctx.valuesLen = 0
}
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) {
wctx.reset()
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
@ -135,7 +156,7 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
// send the current block to ppBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]