This commit is contained in:
Aliaksandr Valialkin 2024-05-21 10:39:02 +02:00
parent d75b0df747
commit b593065865
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
10 changed files with 410 additions and 34 deletions

View file

@ -203,14 +203,10 @@ func (br *blockResult) sizeBytes() int {
// setResultColumns sets the given rcs as br columns.
//
// The br is valid only until rcs are modified.
func (br *blockResult) setResultColumns(rcs []resultColumn) {
func (br *blockResult) setResultColumns(rcs []resultColumn, rowsCount int) {
br.reset()
if len(rcs) == 0 || len(rcs[0].values) == 0 {
return
}
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values))
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsCount)
for i := range rcs {
br.addResultColumn(&rcs[i])

View file

@ -123,12 +123,17 @@ type pipeFieldNamesWriteContext struct {
rcs [1]resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeFieldNamesWriteContext) writeRow(v string) {
wctx.rcs[0].addValue(v)
wctx.valuesLen += len(v)
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
@ -140,7 +145,8 @@ func (wctx *pipeFieldNamesWriteContext) flush() {
wctx.valuesLen = 0
// Flush rcs to ppBase
br.setResultColumns(wctx.rcs[:1])
br.setResultColumns(wctx.rcs[:1], wctx.rowsCount)
wctx.rowsCount = 0
wctx.pfp.ppBase.writeBlock(0, br)
br.reset()
wctx.rcs[0].resetValues()

View file

@ -485,8 +485,14 @@ type pipeSortWriteContext struct {
rcs []resultColumn
br blockResult
// rowsWritten is the total number of rows passed to writeNextRow.
rowsWritten uint64
valuesLen int
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the length of all the values in the current block
valuesLen int
}
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
@ -543,6 +549,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
@ -554,12 +561,9 @@ func (wctx *pipeSortWriteContext) flush() {
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br.setResultColumns(rcs)
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.psp.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {

View file

@ -442,6 +442,7 @@ func (psp *pipeStatsProcessor) flush() error {
var br blockResult
var values []string
rowsCount := 0
valuesLen := 0
for key, psg := range m {
// m may be quite big, so this loop can take a lot of time and CPU.
@ -478,8 +479,11 @@ func (psp *pipeStatsProcessor) flush() error {
rcs[i].addValue(v)
valuesLen += len(v)
}
rowsCount++
if valuesLen >= 1_000_000 {
br.setResultColumns(rcs)
br.setResultColumns(rcs, rowsCount)
rowsCount = 0
psp.ppBase.writeBlock(0, &br)
br.reset()
for i := range rcs {
@ -489,7 +493,7 @@ func (psp *pipeStatsProcessor) flush() error {
}
}
br.setResultColumns(rcs)
br.setResultColumns(rcs, rowsCount)
psp.ppBase.writeBlock(0, &br)
return nil

View file

@ -59,6 +59,150 @@ func TestPipeStats(t *testing.T) {
},
})
f("stats count(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `2`},
{"b", `54`},
},
{},
}, [][]Field{
{
{"rows", "5"},
},
})
f("stats count_empty(*) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{},
{
{"a", `2`},
{"b", `54`},
},
{},
}, [][]Field{
{
{"rows", "2"},
},
})
f("stats count(b) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "2"},
},
})
f("stats count_empty(b) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "1"},
},
})
f("stats count(x) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "0"},
},
})
f("stats count(x, _msg, b) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "3"},
},
})
f("stats count_empty(x, _msg) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"rows", "1"},
},
})
f("stats by (a) count(*) as rows", [][]Field{
{
{"_msg", `abc`},
@ -83,6 +227,213 @@ func TestPipeStats(t *testing.T) {
{"rows", "2"},
},
})
f("stats by (a) count(*) if (b:54) as rows", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"a", "1"},
{"rows", "0"},
},
{
{"a", "2"},
{"rows", "1"},
},
})
f("stats by (a, x) count(*) if (b:54) as rows_b54, count(*) as rows_total", [][]Field{
{
{"_msg", `abc`},
{"a", `2`},
{"b", `3`},
},
{
{"_msg", `def`},
{"a", `1`},
{"x", "123"},
},
{
{"a", `2`},
{"b", `54`},
},
}, [][]Field{
{
{"a", "1"},
{"x", "123"},
{"rows_b54", "0"},
{"rows_total", "1"},
},
{
{"a", "2"},
{"x", ""},
{"rows_b54", "1"},
{"rows_total", "2"},
},
})
f("stats by (x:1KiB) count(*) as rows", [][]Field{
{
{"x", "1023"},
{"_msg", "foo"},
},
{
{"x", "1024"},
{"_msg", "bar"},
},
{
{"x", "2047"},
{"_msg", "baz"},
},
}, [][]Field{
{
{"x", "0"},
{"rows", "1"},
},
{
{"x", "1024"},
{"rows", "2"},
},
})
f("stats by (ip:/24) count(*) as rows", [][]Field{
{
{"ip", "1.2.3.4"},
},
{
{"ip", "1.2.3.255"},
},
{
{"ip", "127.2.3.4"},
},
{
{"ip", "1.2.4.0"},
},
}, [][]Field{
{
{"ip", "1.2.3.0"},
{"rows", "2"},
},
{
{"ip", "1.2.4.0"},
{"rows", "1"},
},
{
{"ip", "127.2.3.0"},
{"rows", "1"},
},
})
f("stats by (_time:1d) count(*) as rows", [][]Field{
{
{"_time", "2024-04-01T10:20:30Z"},
{"a", `2`},
{"b", `3`},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "1"},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "2"},
{"b", `54`},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "2"},
{"c", `xyz`},
},
}, [][]Field{
{
{"_time", "2024-04-01T00:00:00Z"},
{"rows", "1"},
},
{
{"_time", "2024-04-02T00:00:00Z"},
{"rows", "3"},
},
})
f("stats by (_time:1d offset 2h) count(*) as rows", [][]Field{
{
{"_time", "2024-04-01T00:20:30Z"},
{"a", `2`},
{"b", `3`},
},
{
{"_time", "2024-04-02T22:20:30Z"},
{"a", "1"},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "2"},
{"b", `54`},
},
{
{"_time", "2024-04-03T01:59:59.999999999Z"},
{"a", "2"},
{"c", `xyz`},
},
}, [][]Field{
{
{"_time", "2024-03-31T02:00:00Z"},
{"rows", "1"},
},
{
{"_time", "2024-04-02T02:00:00Z"},
{"rows", "3"},
},
})
f("stats by (a, _time:1d) count(*) as rows", [][]Field{
{
{"_time", "2024-04-01T10:20:30Z"},
{"a", `2`},
{"b", `3`},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "1"},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "2"},
{"b", `54`},
},
{
{"_time", "2024-04-02T10:20:30Z"},
{"a", "2"},
{"c", `xyz`},
},
}, [][]Field{
{
{"a", "2"},
{"_time", "2024-04-01T00:00:00Z"},
{"rows", "1"},
},
{
{"a", "1"},
{"_time", "2024-04-02T00:00:00Z"},
{"rows", "1"},
},
{
{"a", "2"},
{"_time", "2024-04-02T00:00:00Z"},
{"rows", "2"},
},
})
}
func TestPipeStatsUpdateNeededFields(t *testing.T) {

View file

@ -425,8 +425,14 @@ type pipeTopkWriteContext struct {
rcs []resultColumn
br blockResult
// rowsWritten is the total number of rows passed to writeNextRow.
rowsWritten uint64
valuesLen int
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
@ -490,6 +496,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
@ -503,12 +510,9 @@ func (wctx *pipeTopkWriteContext) flush() {
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br.setResultColumns(rcs)
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.ptp.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {

View file

@ -331,8 +331,13 @@ type pipeUniqWriteContext struct {
rcs []resultColumn
br blockResult
// rowsWritten is the total number of rows passed to writeRow.
rowsWritten uint64
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
@ -369,6 +374,8 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) {
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
@ -380,12 +387,9 @@ func (wctx *pipeUniqWriteContext) flush() {
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br.setResultColumns(rcs)
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.pup.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {

View file

@ -117,6 +117,10 @@ type pipeUnpackWriteContext struct {
rcs []resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
@ -131,6 +135,7 @@ func (wctx *pipeUnpackWriteContext) reset() {
}
wctx.rcs = rcs[:0]
wctx.rowsCount = 0
wctx.valuesLen = 0
}
@ -180,6 +185,8 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
@ -190,13 +197,10 @@ func (wctx *pipeUnpackWriteContext) flush() {
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &wctx.br
br.setResultColumns(rcs)
br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0
wctx.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {

View file

@ -233,6 +233,8 @@ type testBlockResultWriter struct {
ppBase pipeProcessor
rcs []resultColumn
br blockResult
rowsCount int
}
func (brw *testBlockResultWriter) writeRow(row []Field) {
@ -248,6 +250,7 @@ func (brw *testBlockResultWriter) writeRow(row []Field) {
for i, field := range row {
brw.rcs[i].addValue(field.Value)
}
brw.rowsCount++
if rand.Intn(5) == 0 {
brw.flush()
}
@ -266,7 +269,8 @@ func (brw *testBlockResultWriter) areSameFields(row []Field) bool {
}
func (brw *testBlockResultWriter) flush() {
brw.br.setResultColumns(brw.rcs)
brw.br.setResultColumns(brw.rcs, brw.rowsCount)
brw.rowsCount = 0
workerID := rand.Intn(brw.workersCount)
brw.ppBase.writeBlock(uint(workerID), &brw.br)
brw.br.reset()

View file

@ -96,8 +96,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
for _, f := range fields {
c := br.getColumnByName(f)
if c.isConst {
if c.valuesEncoded[0] == "" {
scp.rowsCount += uint64(len(br.timestamps))
if c.valuesEncoded[0] != "" {
return 0
}
continue