From c5734e18b97632444c77c05bd47528eddb792005 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 May 2024 23:23:22 +0200 Subject: [PATCH] wip --- lib/logstorage/block_result.go | 29 +++-------- lib/logstorage/pipe_delete_test.go | 6 ++- lib/logstorage/pipe_extract.go | 3 +- lib/logstorage/pipe_sort.go | 2 +- lib/logstorage/pipe_sort_test.go | 44 +++++++++++++++++ lib/logstorage/pipe_stats_test.go | 79 ++++++++++++++++++++++++++++++ lib/logstorage/pipe_topk.go | 2 +- lib/logstorage/pipe_uniq.go | 2 +- lib/logstorage/pipe_unpack.go | 31 ++++++++++-- 9 files changed, 164 insertions(+), 34 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 17d78a0ed..f1d6d5932 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -3,7 +3,6 @@ package logstorage import ( "math" "slices" - "strings" "sync/atomic" "time" "unsafe" @@ -201,19 +200,6 @@ func (br *blockResult) sizeBytes() int { return n } -// addResultColumns adds the given rcs to br. -// -// The br is valid only until rcs are modified. -func (br *blockResult) addResultColumns(rcs []resultColumn) { - if len(rcs) == 0 || len(rcs[0].values) == 0 { - return - } - - for i := range rcs { - br.addResultColumn(&rcs[i]) - } -} - // setResultColumns sets the given rcs as br columns. // // The br is valid only until rcs are modified. @@ -1275,14 +1261,6 @@ func (br *blockResult) renameSingleColumn(srcName, dstName string) { br.csInitialized = false } -func debugColumnNames(cs []*blockResultColumn) string { - a := make([]string, len(cs)) - for i, c := range cs { - a[i] = c.name - } - return strings.Join(a, ",") -} - // deleteColumns deletes columns with the given columnNames. func (br *blockResult) deleteColumns(columnNames []string) { if len(columnNames) == 0 { @@ -1811,6 +1789,11 @@ type resultColumn struct { values []string } +func (rc *resultColumn) reset() { + rc.name = "" + rc.resetValues() +} + func (rc *resultColumn) resetValues() { clear(rc.values) rc.values = rc.values[:0] @@ -1819,8 +1802,8 @@ func (rc *resultColumn) resetValues() { func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn { dst = slicesutil.SetLength(dst, len(dst)+1) rc := &dst[len(dst)-1] - rc.resetValues() rc.name = name + rc.resetValues() return dst } diff --git a/lib/logstorage/pipe_delete_test.go b/lib/logstorage/pipe_delete_test.go index 63620b691..d423c23fd 100644 --- a/lib/logstorage/pipe_delete_test.go +++ b/lib/logstorage/pipe_delete_test.go @@ -60,7 +60,9 @@ func TestPipeDelete(t *testing.T) { {"_msg", `{"foo":"bar"}`}, {"a", `test`}, }, - }, [][]Field{}) + }, [][]Field{ + {}, + }) // delete non-existing fields f("delete foo, _msg, bar", [][]Field{ @@ -93,6 +95,8 @@ func TestPipeDelete(t *testing.T) { {"b", "df"}, }, }, [][]Field{ + {}, + {}, { {"b", `baz`}, {"c", "d"}, diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index f2d42827e..f829d792d 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -68,10 +68,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeExtractProcessorShard, workersCount) for i := range shards { - ef := newPattern(pe.steps) shards[i] = pipeExtractProcessorShard{ pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{ - ef: ef, + ef: newPattern(pe.steps), }, } } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index a29da7aa4..5c72b1e52 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -516,7 +516,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { } } if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns + // send the current block to ppBase and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] diff --git a/lib/logstorage/pipe_sort_test.go b/lib/logstorage/pipe_sort_test.go index 0e38ee35d..65accb0d6 100644 --- a/lib/logstorage/pipe_sort_test.go +++ b/lib/logstorage/pipe_sort_test.go @@ -154,6 +154,28 @@ func TestPipeSort(t *testing.T) { }, }) + // Sort by multiple fields with limit desc + f("sort by (a, b) desc limit 1", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"a", `2`}, + {"b", `54`}, + }, + }) + // Sort by multiple fields with offset f("sort by (a, b) offset 1", [][]Field{ { @@ -203,6 +225,28 @@ func TestPipeSort(t *testing.T) { {"b", `3`}, }, }) + + // Sort by multiple fields with offset and limit + f("sort by (a, b) desc offset 2 limit 100", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"_msg", `def`}, + {"a", `1`}, + }, + }) } func TestPipeSortUpdateNeededFields(t *testing.T) { diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index 51837eba6..33ad3d041 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -4,6 +4,85 @@ import ( "testing" ) +func TestParsePipeStatsSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`stats count(*) as rows`) + f(`stats by (x) count(*) as rows, count_uniq(x) as uniqs`) + f(`stats by (_time:month offset 6.5h, y) count(*) as rows, count_uniq(x) as uniqs`) + f(`stats by (_time:month offset 6.5h, y) count(*) if (q:w) as rows, count_uniq(x) as uniqs`) +} + +func TestParsePipeStatsFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`stats`) + f(`stats by`) + f(`stats foo`) + f(`stats count`) + f(`stats if (x:y)`) + f(`stats by(x) foo`) + f(`stats by(x:abc) count() rows`) + f(`stats by(x:1h offset) count () rows`) + f(`stats by(x:1h offset foo) count() rows`) +} + +func TestPipeStats(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f("stats count(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "3"}, + }, + }) + + f("stats by (a) count(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"rows", "1"}, + {"a", "2"}, + {"rows", "2"}, + }, + }) +} + func TestPipeStatsUpdateNeededFields(t *testing.T) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index ea77370e1..25a53d156 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -457,7 +457,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo } } if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns + // send the current block to ppBase and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 6b43fb442..f64bc8d9e 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -354,7 +354,7 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { } } if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns + // send the current block to ppBase and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 855b04140..f5ab28bd9 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -22,10 +22,15 @@ func (uctx *fieldsUnpackerContext) resetFields() { } func (uctx *fieldsUnpackerContext) addField(name, value, fieldPrefix string) { - nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) - copy(nameBuf, fieldPrefix) - copy(nameBuf[len(fieldPrefix):], name) - nameCopy := bytesutil.ToUnsafeString(nameBuf) + nameCopy := "" + if fieldPrefix != "" { + nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) + copy(nameBuf, fieldPrefix) + copy(nameBuf[len(fieldPrefix):], name) + nameCopy = bytesutil.ToUnsafeString(nameBuf) + } else { + nameCopy = uctx.a.copyString(name) + } valueCopy := uctx.a.copyString(value) @@ -115,7 +120,23 @@ type pipeUnpackWriteContext struct { valuesLen int } +func (wctx *pipeUnpackWriteContext) reset() { + wctx.brSrc = nil + wctx.csSrc = nil + wctx.ppBase = nil + + rcs := wctx.rcs + for i := range rcs { + rcs[i].reset() + } + wctx.rcs = rcs[:0] + + wctx.valuesLen = 0 +} + func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) { + wctx.reset() + wctx.brSrc = brSrc wctx.csSrc = brSrc.getColumns() wctx.ppBase = ppBase @@ -135,7 +156,7 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { } } if !areEqualColumns { - // send the current block to bbBase and construct a block with new set of columns + // send the current block to ppBase and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0]