diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index f1d6d5932..5a3348dce 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -203,14 +203,10 @@ func (br *blockResult) sizeBytes() int { // setResultColumns sets the given rcs as br columns. // // The br is valid only until rcs are modified. -func (br *blockResult) setResultColumns(rcs []resultColumn) { +func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) { br.reset() - if len(rcs) == 0 || len(rcs[0].values) == 0 { - return - } - - br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) + br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount) for i := range rcs { br.addResultColumn(&rcs[i]) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 9ac1a206d..284855c8d 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -123,12 +123,17 @@ type pipeFieldNamesWriteContext struct { rcs [1]resultColumn br blockResult + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block valuesLen int } func (wctx *pipeFieldNamesWriteContext) writeRow(v string) { wctx.rcs[0].addValue(v) wctx.valuesLen += len(v) + wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } @@ -140,7 +145,8 @@ func (wctx *pipeFieldNamesWriteContext) flush() { wctx.valuesLen = 0 // Flush rcs to ppBase - br.setResultColumns(wctx.rcs[:1]) + br.setResultColumns(wctx.rcs[:1], wctx.rowsCount) + wctx.rowsCount = 0 wctx.pfp.ppBase.writeBlock(0, br) br.reset() wctx.rcs[0].resetValues() diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 5c72b1e52..cdc294644 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -485,8 +485,14 @@ type pipeSortWriteContext struct { rcs []resultColumn br blockResult + // rowsWritten is the total number of rows passed to writeNextRow. rowsWritten uint64 - valuesLen int + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the length of all the values in the current block + valuesLen int } func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { @@ -543,6 +549,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { wctx.valuesLen += len(v) } + wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } @@ -554,12 +561,9 @@ func (wctx *pipeSortWriteContext) flush() { wctx.valuesLen = 0 - if len(rcs) == 0 { - return - } - // Flush rcs to ppBase - br.setResultColumns(rcs) + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 wctx.psp.ppBase.writeBlock(0, br) br.reset() for i := range rcs { diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index b46113197..9b7116633 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -442,6 +442,7 @@ func (psp *pipeStatsProcessor) flush() error { var br blockResult var values []string + rowsCount := 0 valuesLen := 0 for key, psg := range m { // m may be quite big, so this loop can take a lot of time and CPU. @@ -478,8 +479,11 @@ func (psp *pipeStatsProcessor) flush() error { rcs[i].addValue(v) valuesLen += len(v) } + + rowsCount++ if valuesLen >= 1_000_000 { - br.setResultColumns(rcs) + br.setResultColumns(rcs, rowsCount) + rowsCount = 0 psp.ppBase.writeBlock(0, &br) br.reset() for i := range rcs { @@ -489,7 +493,7 @@ func (psp *pipeStatsProcessor) flush() error { } } - br.setResultColumns(rcs) + br.setResultColumns(rcs, rowsCount) psp.ppBase.writeBlock(0, &br) return nil diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go index c0b1b8b96..d2adf9603 100644 --- a/lib/logstorage/pipe_stats_test.go +++ b/lib/logstorage/pipe_stats_test.go @@ -59,6 +59,150 @@ func TestPipeStats(t *testing.T) { }, }) + f("stats count(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `2`}, + {"b", `54`}, + }, + {}, + }, [][]Field{ + { + {"rows", "5"}, + }, + }) + + f("stats count_empty(*) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + {}, + { + {"a", `2`}, + {"b", `54`}, + }, + {}, + }, [][]Field{ + { + {"rows", "2"}, + }, + }) + + f("stats count(b) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "2"}, + }, + }) + + f("stats count_empty(b) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "1"}, + }, + }) + + f("stats count(x) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "0"}, + }, + }) + + f("stats count(x, _msg, b) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "3"}, + }, + }) + + f("stats count_empty(x, _msg) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"rows", "1"}, + }, + }) + f("stats by (a) count(*) as rows", [][]Field{ { {"_msg", `abc`}, @@ -83,6 +227,213 @@ func TestPipeStats(t *testing.T) { {"rows", "2"}, }, }) + + f("stats by (a) count(*) if (b:54) as rows", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"rows", "0"}, + }, + { + {"a", "2"}, + {"rows", "1"}, + }, + }) + + f("stats by (a, x) count(*) if (b:54) as rows_b54, count(*) as rows_total", [][]Field{ + { + {"_msg", `abc`}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_msg", `def`}, + {"a", `1`}, + {"x", "123"}, + }, + { + {"a", `2`}, + {"b", `54`}, + }, + }, [][]Field{ + { + {"a", "1"}, + {"x", "123"}, + {"rows_b54", "0"}, + {"rows_total", "1"}, + }, + { + {"a", "2"}, + {"x", ""}, + {"rows_b54", "1"}, + {"rows_total", "2"}, + }, + }) + + f("stats by (x:1KiB) count(*) as rows", [][]Field{ + { + {"x", "1023"}, + {"_msg", "foo"}, + }, + { + {"x", "1024"}, + {"_msg", "bar"}, + }, + { + {"x", "2047"}, + {"_msg", "baz"}, + }, + }, [][]Field{ + { + {"x", "0"}, + {"rows", "1"}, + }, + { + {"x", "1024"}, + {"rows", "2"}, + }, + }) + + f("stats by (ip:/24) count(*) as rows", [][]Field{ + { + {"ip", "1.2.3.4"}, + }, + { + {"ip", "1.2.3.255"}, + }, + { + {"ip", "127.2.3.4"}, + }, + { + {"ip", "1.2.4.0"}, + }, + }, [][]Field{ + { + {"ip", "1.2.3.0"}, + {"rows", "2"}, + }, + { + {"ip", "1.2.4.0"}, + {"rows", "1"}, + }, + { + {"ip", "127.2.3.0"}, + {"rows", "1"}, + }, + }) + + f("stats by (_time:1d) count(*) as rows", [][]Field{ + { + {"_time", "2024-04-01T10:20:30Z"}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "1"}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "2"}, + {"b", `54`}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "2"}, + {"c", `xyz`}, + }, + }, [][]Field{ + { + {"_time", "2024-04-01T00:00:00Z"}, + {"rows", "1"}, + }, + { + {"_time", "2024-04-02T00:00:00Z"}, + {"rows", "3"}, + }, + }) + + f("stats by (_time:1d offset 2h) count(*) as rows", [][]Field{ + { + {"_time", "2024-04-01T00:20:30Z"}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_time", "2024-04-02T22:20:30Z"}, + {"a", "1"}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "2"}, + {"b", `54`}, + }, + { + {"_time", "2024-04-03T01:59:59.999999999Z"}, + {"a", "2"}, + {"c", `xyz`}, + }, + }, [][]Field{ + { + {"_time", "2024-03-31T02:00:00Z"}, + {"rows", "1"}, + }, + { + {"_time", "2024-04-02T02:00:00Z"}, + {"rows", "3"}, + }, + }) + + f("stats by (a, _time:1d) count(*) as rows", [][]Field{ + { + {"_time", "2024-04-01T10:20:30Z"}, + {"a", `2`}, + {"b", `3`}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "1"}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "2"}, + {"b", `54`}, + }, + { + {"_time", "2024-04-02T10:20:30Z"}, + {"a", "2"}, + {"c", `xyz`}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"_time", "2024-04-01T00:00:00Z"}, + {"rows", "1"}, + }, + { + {"a", "1"}, + {"_time", "2024-04-02T00:00:00Z"}, + {"rows", "1"}, + }, + { + {"a", "2"}, + {"_time", "2024-04-02T00:00:00Z"}, + {"rows", "2"}, + }, + }) } func TestPipeStatsUpdateNeededFields(t *testing.T) { diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index ba971555b..4aa3d5bdf 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -425,8 +425,14 @@ type pipeTopkWriteContext struct { rcs []resultColumn br blockResult + // rowsWritten is the total number of rows passed to writeNextRow. rowsWritten uint64 - valuesLen int + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int } func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool { @@ -490,6 +496,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.valuesLen += len(v) } + wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } @@ -503,12 +510,9 @@ func (wctx *pipeTopkWriteContext) flush() { wctx.valuesLen = 0 - if len(rcs) == 0 { - return - } - // Flush rcs to ppBase - br.setResultColumns(rcs) + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 wctx.ptp.ppBase.writeBlock(0, br) br.reset() for i := range rcs { diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index f64bc8d9e..32b9e8f28 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -331,8 +331,13 @@ type pipeUniqWriteContext struct { rcs []resultColumn br blockResult + // rowsWritten is the total number of rows passed to writeRow. rowsWritten uint64 + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block valuesLen int } @@ -369,6 +374,8 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { rcs[i].addValue(v) wctx.valuesLen += len(v) } + + wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } @@ -380,12 +387,9 @@ func (wctx *pipeUniqWriteContext) flush() { wctx.valuesLen = 0 - if len(rcs) == 0 { - return - } - // Flush rcs to ppBase - br.setResultColumns(rcs) + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 wctx.pup.ppBase.writeBlock(0, br) br.reset() for i := range rcs { diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index f5ab28bd9..2f232d8ab 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -117,6 +117,10 @@ type pipeUnpackWriteContext struct { rcs []resultColumn br blockResult + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block valuesLen int } @@ -131,6 +135,7 @@ func (wctx *pipeUnpackWriteContext) reset() { } wctx.rcs = rcs[:0] + wctx.rowsCount = 0 wctx.valuesLen = 0 } @@ -180,6 +185,8 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { rcs[len(csSrc)+i].addValue(v) wctx.valuesLen += len(v) } + + wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } @@ -190,13 +197,10 @@ func (wctx *pipeUnpackWriteContext) flush() { wctx.valuesLen = 0 - if len(rcs) == 0 { - return - } - // Flush rcs to ppBase br := &wctx.br - br.setResultColumns(rcs) + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 wctx.ppBase.writeBlock(0, br) br.reset() for i := range rcs { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 52a12d00f..a1e6bed52 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -233,6 +233,8 @@ type testBlockResultWriter struct { ppBase pipeProcessor rcs []resultColumn br blockResult + + rowsCount int } func (brw *testBlockResultWriter) writeRow(row []Field) { @@ -248,6 +250,7 @@ func (brw *testBlockResultWriter) writeRow(row []Field) { for i, field := range row { brw.rcs[i].addValue(field.Value) } + brw.rowsCount++ if rand.Intn(5) == 0 { brw.flush() } @@ -266,7 +269,8 @@ func (brw *testBlockResultWriter) areSameFields(row []Field) bool { } func (brw *testBlockResultWriter) flush() { - brw.br.setResultColumns(brw.rcs) + brw.br.setResultColumns(brw.rcs, brw.rowsCount) + brw.rowsCount = 0 workerID := rand.Intn(brw.workersCount) brw.ppBase.writeBlock(uint(workerID), &brw.br) brw.br.reset() diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go index b9916ef24..7065e57a4 100644 --- a/lib/logstorage/stats_count_empty.go +++ b/lib/logstorage/stats_count_empty.go @@ -96,8 +96,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int for _, f := range fields { c := br.getColumnByName(f) if c.isConst { - if c.valuesEncoded[0] == "" { - scp.rowsCount += uint64(len(br.timestamps)) + if c.valuesEncoded[0] != "" { return 0 } continue