mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
42c49c37ff
commit
33a01c659b
9 changed files with 643 additions and 66 deletions
|
@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs.
|
||||
* FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe).
|
||||
|
||||
## [v0.6.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.1-victorialogs)
|
||||
|
|
|
@ -1198,11 +1198,23 @@ The reverse order can be applied globally via `desc` keyword after `by(...)` cla
|
|||
_time:5m | sort by (foo, bar) desc
|
||||
```
|
||||
|
||||
Sorting of big number of logs can consume a lot of CPU time and memory. Sometimes it is enough to return the first `N` entries with the biggest
|
||||
or smallest values. This can be done by adding ``first N` to the end of `sort ...` pipe.
|
||||
Such a query consumes lower amounts of memory when sorting big number of logs, since it keeps in memory only `N` log entries.
|
||||
For example, the following query returns top 10 log entries with the biggest values
|
||||
for the `request_duration` [field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) during the last hour:
|
||||
|
||||
```logsql
|
||||
_time:1h | sort by (request_duration desc) first 10
|
||||
```
|
||||
|
||||
Note that sorting of big number of logs can be slow and can consume a lot of additional memory.
|
||||
It is recommended limiting the number of logs before sorting with the following approaches:
|
||||
|
||||
- Adding `first N` to the end of `sort ...` pipe.
|
||||
- Reducing the selected time range with [time filter](#time-filter).
|
||||
- Using more specific [filters](#filters), so they select less logs.
|
||||
- Limiting the number of selected [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) via [`fields` pipe](#fields-pipe).
|
||||
|
||||
See also:
|
||||
|
||||
|
|
|
@ -974,6 +974,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
f(`* | sort bY (foo)`, `* | sort by (foo)`)
|
||||
f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`)
|
||||
f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`)
|
||||
f(`* | sort first 10`, `* | sort first 10`)
|
||||
f(`* | sort desc first 10`, `* | sort desc first 10`)
|
||||
f(`* | sort by (foo desc, bar) first 10`, `* | sort by (foo desc, bar) first 10`)
|
||||
f(`* | sort by (foo desc, bar) desc first 10`, `* | sort by (foo desc, bar) desc first 10`)
|
||||
|
||||
// uniq pipe
|
||||
f(`* | uniq`, `* | uniq`)
|
||||
|
@ -1330,6 +1334,10 @@ func TestParseQueryFailure(t *testing.T) {
|
|||
f(`foo | sort by(baz`)
|
||||
f(`foo | sort by(baz,`)
|
||||
f(`foo | sort by(bar) foo`)
|
||||
f(`foo | sort by(bar) first`)
|
||||
f(`foo | sort by(bar) first foo`)
|
||||
f(`foo | sort by(bar) first -1234`)
|
||||
f(`foo | sort by(bar) first 12.34`)
|
||||
|
||||
// invalid uniq pipe
|
||||
f(`foo | uniq bar`)
|
||||
|
|
|
@ -65,7 +65,7 @@ func parsePipes(lex *lexer) ([]pipe, error) {
|
|||
var pipes []pipe
|
||||
for !lex.isKeyword(")", "") {
|
||||
if !lex.isKeyword("|") {
|
||||
return nil, fmt.Errorf("expecting '|'")
|
||||
return nil, fmt.Errorf("expecting '|'; got %q", lex.token)
|
||||
}
|
||||
if !lex.mustNextToken() {
|
||||
return nil, fmt.Errorf("missing token after '|'")
|
||||
|
|
|
@ -25,6 +25,11 @@ type pipeSort struct {
|
|||
|
||||
// whether to apply descending order
|
||||
isDesc bool
|
||||
|
||||
// how many results to return
|
||||
//
|
||||
// if zero, then all the results are returned
|
||||
limit uint64
|
||||
}
|
||||
|
||||
func (ps *pipeSort) String() string {
|
||||
|
@ -39,6 +44,9 @@ func (ps *pipeSort) String() string {
|
|||
if ps.isDesc {
|
||||
s += " desc"
|
||||
}
|
||||
if ps.limit > 0 {
|
||||
s += fmt.Sprintf(" first %d", ps.limit)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -55,6 +63,13 @@ func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|||
}
|
||||
|
||||
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||
if ps.limit > 0 {
|
||||
return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase)
|
||||
}
|
||||
return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppBase)
|
||||
}
|
||||
|
||||
func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||||
|
||||
shards := make([]pipeSortProcessorShard, workersCount)
|
||||
|
@ -117,6 +132,9 @@ type pipeSortProcessorShardNopad struct {
|
|||
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||||
// The per-shard budget is provided in chunks from the parent pipeSortProcessor.
|
||||
stateSizeBudget int
|
||||
|
||||
// columnValues is used as temporary buffer at pipeSortProcessorShard.writeBlock
|
||||
columnValues [][]string
|
||||
}
|
||||
|
||||
// sortBlock represents a block of logs for sorting.
|
||||
|
@ -176,16 +194,23 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
|||
if len(byFields) == 0 {
|
||||
// Sort by all the columns
|
||||
|
||||
columnValues := shard.columnValues[:0]
|
||||
for _, c := range cs {
|
||||
columnValues = append(columnValues, c.getValues(br))
|
||||
}
|
||||
shard.columnValues = columnValues
|
||||
|
||||
// Generate byColumns
|
||||
var rc resultColumn
|
||||
|
||||
bb := bbPool.Get()
|
||||
for i := range br.timestamps {
|
||||
// JSON-encode all the columns per each row into a single string
|
||||
for rowIdx := range br.timestamps {
|
||||
// Marshal all the columns per each row into a single string
|
||||
// and sort rows by the resulting string.
|
||||
bb.B = bb.B[:0]
|
||||
for _, c := range cs {
|
||||
v := c.getValueAtRow(br, i)
|
||||
bb.B = marshalJSONKeyValue(bb.B, c.name, v)
|
||||
for i, values := range columnValues {
|
||||
v := values[rowIdx]
|
||||
bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v)
|
||||
bb.B = append(bb.B, ',')
|
||||
}
|
||||
rc.addValue(bytesutil.ToUnsafeString(bb.B))
|
||||
|
@ -358,10 +383,8 @@ func (psp *pipeSortProcessor) flush() error {
|
|||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Sort every shard in parallel
|
||||
|
@ -377,17 +400,15 @@ func (psp *pipeSortProcessor) flush() error {
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Merge sorted results across shards
|
||||
sh := pipeSortProcessorShardsHeap(make([]*pipeSortProcessorShard, 0, len(shards)))
|
||||
for i := range shards {
|
||||
shard := &shards[i]
|
||||
if shard.Len() > 0 {
|
||||
if len(shard.rowRefs) > 0 {
|
||||
sh = append(sh, shard)
|
||||
}
|
||||
}
|
||||
|
@ -400,49 +421,43 @@ func (psp *pipeSortProcessor) flush() error {
|
|||
wctx := &pipeSortWriteContext{
|
||||
psp: psp,
|
||||
}
|
||||
var shardNext *pipeSortProcessorShard
|
||||
shardNextIdx := 0
|
||||
|
||||
for len(sh) > 1 {
|
||||
shard := sh[0]
|
||||
wctx.writeRow(shard, shard.rowRefNext)
|
||||
shard.rowRefNext++
|
||||
wctx.writeNextRow(shard)
|
||||
|
||||
if shard.rowRefNext >= len(shard.rowRefs) {
|
||||
_ = heap.Pop(&sh)
|
||||
shardNext = nil
|
||||
shardNextIdx = 0
|
||||
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if shardNext == nil {
|
||||
shardNext = sh[1]
|
||||
if len(sh) > 2 && sortBlockLess(sh[2], sh[2].rowRefNext, shardNext, shardNext.rowRefNext) {
|
||||
shardNext = sh[2]
|
||||
if shardNextIdx == 0 {
|
||||
shardNextIdx = 1
|
||||
if len(sh) > 2 && sh.Less(2, 1) {
|
||||
shardNextIdx = 2
|
||||
}
|
||||
}
|
||||
|
||||
if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) {
|
||||
if sh.Less(shardNextIdx, 0) {
|
||||
heap.Fix(&sh, 0)
|
||||
shardNext = nil
|
||||
shardNextIdx = 0
|
||||
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(sh) == 1 {
|
||||
shard := sh[0]
|
||||
for shard.rowRefNext < len(shard.rowRefs) {
|
||||
wctx.writeRow(shard, shard.rowRefNext)
|
||||
shard.rowRefNext++
|
||||
wctx.writeNextRow(shard)
|
||||
}
|
||||
}
|
||||
wctx.flush()
|
||||
|
@ -458,7 +473,9 @@ type pipeSortWriteContext struct {
|
|||
valuesLen int
|
||||
}
|
||||
|
||||
func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx int) {
|
||||
func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
|
||||
rowIdx := shard.rowRefNext
|
||||
shard.rowRefNext++
|
||||
rr := shard.rowRefs[rowIdx]
|
||||
b := &shard.blocks[rr.blockIdx]
|
||||
|
||||
|
@ -671,6 +688,17 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) {
|
|||
ps.isDesc = true
|
||||
}
|
||||
|
||||
switch {
|
||||
case lex.isKeyword("first"):
|
||||
lex.nextToken()
|
||||
n, ok := tryParseUint64(lex.token)
|
||||
lex.nextToken()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cannot parse 'first %s'", lex.token)
|
||||
}
|
||||
ps.limit = n
|
||||
}
|
||||
|
||||
return &ps, nil
|
||||
}
|
||||
|
||||
|
@ -725,13 +753,6 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func marshalJSONKeyValue(dst []byte, k, v string) []byte {
|
||||
dst = strconv.AppendQuote(dst, k)
|
||||
dst = append(dst, ':')
|
||||
dst = strconv.AppendQuote(dst, v)
|
||||
return dst
|
||||
}
|
||||
|
||||
func tryParseInt64(s string) (int64, bool) {
|
||||
if len(s) == 0 {
|
||||
return 0, false
|
||||
|
@ -756,3 +777,10 @@ func tryParseInt64(s string) (int64, bool) {
|
|||
}
|
||||
return -int64(u64), true
|
||||
}
|
||||
|
||||
func marshalJSONKeyValue(dst []byte, k, v string) []byte {
|
||||
dst = strconv.AppendQuote(dst, k)
|
||||
dst = append(dst, ':')
|
||||
dst = strconv.AppendQuote(dst, v)
|
||||
return dst
|
||||
}
|
||||
|
|
|
@ -345,10 +345,8 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||
for key, psg := range shard.m {
|
||||
// shard.m may be quite big, so this loop can take a lot of time and CPU.
|
||||
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
spgBase := m[key]
|
||||
|
@ -388,10 +386,8 @@ func (psp *pipeStatsProcessor) flush() error {
|
|||
for key, psg := range m {
|
||||
// m may be quite big, so this loop can take a lot of time and CPU.
|
||||
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
|
||||
select {
|
||||
case <-psp.stopCh:
|
||||
if needStop(psp.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Unmarshal values for byFields from key.
|
||||
|
|
546
lib/logstorage/pipe_topk.go
Normal file
546
lib/logstorage/pipe_topk.go
Normal file
|
@ -0,0 +1,546 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
)
|
||||
|
||||
func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||||
|
||||
shards := make([]pipeTopkProcessorShard, workersCount)
|
||||
for i := range shards {
|
||||
shard := &shards[i]
|
||||
shard.ps = ps
|
||||
shard.stateSizeBudget = stateSizeBudgetChunk
|
||||
maxStateSize -= stateSizeBudgetChunk
|
||||
}
|
||||
|
||||
ptp := &pipeTopkProcessor{
|
||||
ps: ps,
|
||||
stopCh: stopCh,
|
||||
cancel: cancel,
|
||||
ppBase: ppBase,
|
||||
|
||||
shards: shards,
|
||||
|
||||
maxStateSize: maxStateSize,
|
||||
}
|
||||
ptp.stateSizeBudget.Store(maxStateSize)
|
||||
|
||||
return ptp
|
||||
}
|
||||
|
||||
type pipeTopkProcessor struct {
|
||||
ps *pipeSort
|
||||
stopCh <-chan struct{}
|
||||
cancel func()
|
||||
ppBase pipeProcessor
|
||||
|
||||
shards []pipeTopkProcessorShard
|
||||
|
||||
maxStateSize int64
|
||||
stateSizeBudget atomic.Int64
|
||||
}
|
||||
|
||||
type pipeTopkProcessorShard struct {
|
||||
pipeTopkProcessorShardNopad
|
||||
|
||||
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||
_ [128 - unsafe.Sizeof(pipeTopkProcessorShardNopad{})%128]byte
|
||||
}
|
||||
|
||||
type pipeTopkProcessorShardNopad struct {
|
||||
// ps points to the parent pipeSort.
|
||||
ps *pipeSort
|
||||
|
||||
// rows contains rows tracked by the given shard.
|
||||
rows []*pipeTopkRow
|
||||
|
||||
// rowNext points to the next index at rows during merge shards phase
|
||||
rowNext int
|
||||
|
||||
// tmpRow is used as a temporary row when determining whether the next ingested row must be stored in the shard.
|
||||
tmpRow pipeTopkRow
|
||||
|
||||
// these are aux fields for determining whether the next row must be stored in rows.
|
||||
byColumnValues [][]string
|
||||
otherColumnValues []pipeTopkOtherColumn
|
||||
byColumns []string
|
||||
otherColumns []Field
|
||||
|
||||
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||||
// The per-shard budget is provided in chunks from the parent pipeTopkProcessor.
|
||||
stateSizeBudget int
|
||||
}
|
||||
|
||||
type pipeTopkRow struct {
|
||||
byColumns []string
|
||||
otherColumns []Field
|
||||
}
|
||||
|
||||
type pipeTopkOtherColumn struct {
|
||||
name string
|
||||
values []string
|
||||
}
|
||||
|
||||
func (r *pipeTopkRow) clone() *pipeTopkRow {
|
||||
byColumnsCopy := make([]string, len(r.byColumns))
|
||||
for i := range byColumnsCopy {
|
||||
byColumnsCopy[i] = strings.Clone(r.byColumns[i])
|
||||
}
|
||||
|
||||
otherColumnsCopy := make([]Field, len(r.otherColumns))
|
||||
for i := range otherColumnsCopy {
|
||||
src := &r.otherColumns[i]
|
||||
dst := &otherColumnsCopy[i]
|
||||
dst.Name = strings.Clone(src.Name)
|
||||
dst.Value = strings.Clone(src.Value)
|
||||
}
|
||||
|
||||
return &pipeTopkRow{
|
||||
byColumns: byColumnsCopy,
|
||||
otherColumns: otherColumnsCopy,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *pipeTopkRow) sizeBytes() int {
|
||||
n := int(unsafe.Sizeof(*r))
|
||||
|
||||
for _, v := range r.byColumns {
|
||||
n += len(v)
|
||||
}
|
||||
n += len(r.byColumns) * int(unsafe.Sizeof(r.byColumns[0]))
|
||||
|
||||
for _, f := range r.otherColumns {
|
||||
n += len(f.Name) + len(f.Value)
|
||||
}
|
||||
n += len(r.otherColumns) * int(unsafe.Sizeof(r.otherColumns[0]))
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) Len() int {
|
||||
return len(shard.rows)
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) Swap(i, j int) {
|
||||
rows := shard.rows
|
||||
rows[i], rows[j] = rows[j], rows[i]
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) Less(i, j int) bool {
|
||||
rows := shard.rows
|
||||
|
||||
// This is max heap
|
||||
return topkLess(shard.ps, rows[j], rows[i])
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) Push(x any) {
|
||||
r := x.(*pipeTopkRow)
|
||||
shard.rows = append(shard.rows, r)
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) Pop() any {
|
||||
rows := shard.rows
|
||||
x := rows[len(rows)-1]
|
||||
rows[len(rows)-1] = nil
|
||||
shard.rows = rows[:len(rows)-1]
|
||||
return x
|
||||
}
|
||||
|
||||
// writeBlock writes br to shard.
|
||||
func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
|
||||
cs := br.getColumns()
|
||||
|
||||
byFields := shard.ps.byFields
|
||||
if len(byFields) == 0 {
|
||||
// Sort by all the fields
|
||||
|
||||
byColumnValues := shard.byColumnValues[:0]
|
||||
for _, c := range cs {
|
||||
byColumnValues = append(byColumnValues, c.getValues(br))
|
||||
}
|
||||
shard.byColumnValues = byColumnValues
|
||||
|
||||
byColumns := shard.byColumns[:0]
|
||||
otherColumns := shard.otherColumns[:0]
|
||||
bb := bbPool.Get()
|
||||
for rowIdx := range br.timestamps {
|
||||
byColumns = byColumns[:0]
|
||||
bb.B = bb.B[:0]
|
||||
for i, values := range byColumnValues {
|
||||
v := values[rowIdx]
|
||||
bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v)
|
||||
bb.B = append(bb.B, ',')
|
||||
}
|
||||
byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B))
|
||||
|
||||
otherColumns = otherColumns[:0]
|
||||
for i, values := range byColumnValues {
|
||||
otherColumns = append(otherColumns, Field{
|
||||
Name: cs[i].name,
|
||||
Value: values[rowIdx],
|
||||
})
|
||||
}
|
||||
|
||||
shard.addRow(byColumns, otherColumns)
|
||||
}
|
||||
bbPool.Put(bb)
|
||||
shard.byColumns = byColumns
|
||||
shard.otherColumns = otherColumns
|
||||
} else {
|
||||
// Sort by byFields
|
||||
|
||||
byColumnValues := shard.byColumnValues[:0]
|
||||
for _, bf := range byFields {
|
||||
c := br.getColumnByName(bf.name)
|
||||
byColumnValues = append(byColumnValues, c.getValues(br))
|
||||
}
|
||||
shard.byColumnValues = byColumnValues
|
||||
|
||||
otherColumnValues := shard.otherColumnValues[:0]
|
||||
for _, c := range cs {
|
||||
isByField := false
|
||||
for _, bf := range byFields {
|
||||
if bf.name == c.name {
|
||||
isByField = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !isByField {
|
||||
otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{
|
||||
name: c.name,
|
||||
values: c.getValues(br),
|
||||
})
|
||||
}
|
||||
}
|
||||
shard.otherColumnValues = otherColumnValues
|
||||
|
||||
// add rows to shard
|
||||
byColumns := shard.byColumns[:0]
|
||||
otherColumns := shard.otherColumns[:0]
|
||||
for rowIdx := range br.timestamps {
|
||||
byColumns = byColumns[:0]
|
||||
for _, values := range byColumnValues {
|
||||
byColumns = append(byColumns, values[rowIdx])
|
||||
}
|
||||
|
||||
otherColumns = otherColumns[:0]
|
||||
for _, ocv := range otherColumnValues {
|
||||
otherColumns = append(otherColumns, Field{
|
||||
Name: ocv.name,
|
||||
Value: ocv.values[rowIdx],
|
||||
})
|
||||
}
|
||||
|
||||
shard.addRow(byColumns, otherColumns)
|
||||
}
|
||||
shard.byColumns = byColumns
|
||||
shard.otherColumns = otherColumns
|
||||
}
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) addRow(byColumns []string, otherColumns []Field) {
|
||||
r := &shard.tmpRow
|
||||
r.byColumns = byColumns
|
||||
r.otherColumns = otherColumns
|
||||
|
||||
rows := shard.rows
|
||||
if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) {
|
||||
// Fast path - nothing to add.
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path - add r to shard.rows.
|
||||
r = r.clone()
|
||||
shard.stateSizeBudget -= r.sizeBytes()
|
||||
if uint64(len(rows)) < shard.ps.limit {
|
||||
heap.Push(shard, r)
|
||||
shard.stateSizeBudget -= int(unsafe.Sizeof(r))
|
||||
} else {
|
||||
shard.stateSizeBudget += rows[0].sizeBytes()
|
||||
rows[0] = r
|
||||
heap.Fix(shard, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func (shard *pipeTopkProcessorShard) sortRows(stopCh <-chan struct{}) {
|
||||
rows := shard.rows
|
||||
for i := len(rows) - 1; i > 0; i-- {
|
||||
x := heap.Pop(shard)
|
||||
rows[i] = x.(*pipeTopkRow)
|
||||
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ptp *pipeTopkProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||
if len(br.timestamps) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
shard := &ptp.shards[workerID]
|
||||
|
||||
for shard.stateSizeBudget < 0 {
|
||||
// steal some budget for the state size from the global budget.
|
||||
remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk)
|
||||
if remaining < 0 {
|
||||
// The state size is too big. Stop processing data in order to avoid OOM crash.
|
||||
if remaining+stateSizeBudgetChunk >= 0 {
|
||||
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
|
||||
ptp.cancel()
|
||||
}
|
||||
return
|
||||
}
|
||||
shard.stateSizeBudget += stateSizeBudgetChunk
|
||||
}
|
||||
|
||||
shard.writeBlock(br)
|
||||
}
|
||||
|
||||
func (ptp *pipeTopkProcessor) flush() error {
|
||||
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
||||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.ps.String(), ptp.maxStateSize/(1<<20))
|
||||
}
|
||||
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sort every shard in parallel
|
||||
var wg sync.WaitGroup
|
||||
shards := ptp.shards
|
||||
for i := range shards {
|
||||
wg.Add(1)
|
||||
go func(shard *pipeTopkProcessorShard) {
|
||||
shard.sortRows(ptp.stopCh)
|
||||
wg.Done()
|
||||
}(&shards[i])
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge sorted results across shards
|
||||
sh := pipeTopkProcessorShardsHeap(make([]*pipeTopkProcessorShard, 0, len(shards)))
|
||||
for i := range shards {
|
||||
shard := &shards[i]
|
||||
if len(shard.rows) > 0 {
|
||||
sh = append(sh, shard)
|
||||
}
|
||||
}
|
||||
if len(sh) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
heap.Init(&sh)
|
||||
|
||||
wctx := &pipeTopkWriteContext{
|
||||
ptp: ptp,
|
||||
}
|
||||
shardNextIdx := 0
|
||||
|
||||
for len(sh) > 1 {
|
||||
shard := sh[0]
|
||||
if !wctx.writeNextRow(shard) {
|
||||
break
|
||||
}
|
||||
|
||||
if shard.rowNext >= len(shard.rows) {
|
||||
_ = heap.Pop(&sh)
|
||||
shardNextIdx = 0
|
||||
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if shardNextIdx == 0 {
|
||||
shardNextIdx = 1
|
||||
if len(sh) > 2 && sh.Less(2, 1) {
|
||||
shardNextIdx = 2
|
||||
}
|
||||
}
|
||||
|
||||
if sh.Less(shardNextIdx, 0) {
|
||||
heap.Fix(&sh, 0)
|
||||
shardNextIdx = 0
|
||||
|
||||
if needStop(ptp.stopCh) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(sh) == 1 {
|
||||
shard := sh[0]
|
||||
for shard.rowNext < len(shard.rows) {
|
||||
if !wctx.writeNextRow(shard) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
wctx.flush()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type pipeTopkWriteContext struct {
|
||||
ptp *pipeTopkProcessor
|
||||
rcs []resultColumn
|
||||
br blockResult
|
||||
|
||||
rowsWritten uint64
|
||||
valuesLen int
|
||||
}
|
||||
|
||||
func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bool {
|
||||
if wctx.rowsWritten >= wctx.ptp.ps.limit {
|
||||
return false
|
||||
}
|
||||
wctx.rowsWritten++
|
||||
|
||||
rowIdx := shard.rowNext
|
||||
shard.rowNext++
|
||||
r := shard.rows[rowIdx]
|
||||
|
||||
byFields := shard.ps.byFields
|
||||
rcs := wctx.rcs
|
||||
|
||||
areEqualColumns := len(rcs) == len(byFields)+len(r.otherColumns)
|
||||
if areEqualColumns {
|
||||
for i, c := range r.otherColumns {
|
||||
if rcs[len(byFields)+i].name != c.Name {
|
||||
areEqualColumns = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !areEqualColumns {
|
||||
// send the current block to bbBase and construct a block with new set of columns
|
||||
wctx.flush()
|
||||
|
||||
rcs = wctx.rcs[:0]
|
||||
for _, bf := range byFields {
|
||||
rcs = append(rcs, resultColumn{
|
||||
name: bf.name,
|
||||
})
|
||||
}
|
||||
for _, c := range r.otherColumns {
|
||||
rcs = append(rcs, resultColumn{
|
||||
name: c.Name,
|
||||
})
|
||||
}
|
||||
wctx.rcs = rcs
|
||||
}
|
||||
|
||||
byColumns := r.byColumns
|
||||
for i := range byFields {
|
||||
v := byColumns[i]
|
||||
rcs[i].addValue(v)
|
||||
wctx.valuesLen += len(v)
|
||||
}
|
||||
|
||||
for i, c := range r.otherColumns {
|
||||
v := c.Value
|
||||
rcs[len(byFields)+i].addValue(v)
|
||||
wctx.valuesLen += len(v)
|
||||
}
|
||||
|
||||
if wctx.valuesLen >= 1_000_000 {
|
||||
wctx.flush()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (wctx *pipeTopkWriteContext) flush() {
|
||||
rcs := wctx.rcs
|
||||
br := &wctx.br
|
||||
|
||||
wctx.valuesLen = 0
|
||||
|
||||
if len(rcs) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Flush rcs to ppBase
|
||||
br.setResultColumns(rcs)
|
||||
wctx.ptp.ppBase.writeBlock(0, br)
|
||||
br.reset()
|
||||
for i := range rcs {
|
||||
rcs[i].resetKeepName()
|
||||
}
|
||||
}
|
||||
|
||||
type pipeTopkProcessorShardsHeap []*pipeTopkProcessorShard
|
||||
|
||||
func (sh *pipeTopkProcessorShardsHeap) Len() int {
|
||||
return len(*sh)
|
||||
}
|
||||
|
||||
func (sh *pipeTopkProcessorShardsHeap) Swap(i, j int) {
|
||||
a := *sh
|
||||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
|
||||
func (sh *pipeTopkProcessorShardsHeap) Less(i, j int) bool {
|
||||
a := *sh
|
||||
shardA := a[i]
|
||||
shardB := a[j]
|
||||
return topkLess(shardA.ps, shardA.rows[shardA.rowNext], shardB.rows[shardB.rowNext])
|
||||
}
|
||||
|
||||
func (sh *pipeTopkProcessorShardsHeap) Push(x any) {
|
||||
shard := x.(*pipeTopkProcessorShard)
|
||||
*sh = append(*sh, shard)
|
||||
}
|
||||
|
||||
func (sh *pipeTopkProcessorShardsHeap) Pop() any {
|
||||
a := *sh
|
||||
x := a[len(a)-1]
|
||||
a[len(a)-1] = nil
|
||||
*sh = a[:len(a)-1]
|
||||
return x
|
||||
}
|
||||
|
||||
func topkLess(ps *pipeSort, a, b *pipeTopkRow) bool {
|
||||
byFields := ps.byFields
|
||||
|
||||
csA := a.byColumns
|
||||
csB := b.byColumns
|
||||
|
||||
for k := range csA {
|
||||
isDesc := ps.isDesc
|
||||
if len(byFields) > 0 && byFields[k].isDesc {
|
||||
isDesc = !isDesc
|
||||
}
|
||||
|
||||
vA := csA[k]
|
||||
vB := csB[k]
|
||||
|
||||
if vA == vB {
|
||||
continue
|
||||
}
|
||||
|
||||
if isDesc {
|
||||
return stringsutil.LessNatural(vB, vA)
|
||||
}
|
||||
return stringsutil.LessNatural(vA, vB)
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -209,10 +209,8 @@ func (pup *pipeUniqProcessor) flush() error {
|
|||
m := shards[0].m
|
||||
shards = shards[1:]
|
||||
for i := range shards {
|
||||
select {
|
||||
case <-pup.stopCh:
|
||||
if needStop(pup.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
for k := range shards[i].m {
|
||||
|
@ -229,10 +227,8 @@ func (pup *pipeUniqProcessor) flush() error {
|
|||
|
||||
if len(byFields) == 0 {
|
||||
for k := range m {
|
||||
select {
|
||||
case <-pup.stopCh:
|
||||
if needStop(pup.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
rowFields = rowFields[:0]
|
||||
|
@ -259,10 +255,8 @@ func (pup *pipeUniqProcessor) flush() error {
|
|||
}
|
||||
} else {
|
||||
for k := range m {
|
||||
select {
|
||||
case <-pup.stopCh:
|
||||
if needStop(pup.stopCh) {
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
rowFields = rowFields[:0]
|
||||
|
|
|
@ -182,12 +182,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
|||
bsws := bswb.bsws
|
||||
for i := range bsws {
|
||||
bsw := &bsws[i]
|
||||
select {
|
||||
case <-stopCh:
|
||||
if needStop(stopCh) {
|
||||
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
|
||||
bsw.reset()
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
||||
bs.search(bsw)
|
||||
|
@ -266,11 +264,9 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
|
|||
type partitionSearchFinalizer func()
|
||||
|
||||
func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
select {
|
||||
case <-stopCh:
|
||||
if needStop(stopCh) {
|
||||
// Do not spend CPU time on search, since it is already stopped.
|
||||
return func() {}
|
||||
default:
|
||||
}
|
||||
|
||||
tenantIDs := so.tenantIDs
|
||||
|
@ -436,10 +432,8 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c
|
|||
// it is assumed that ibhs are sorted
|
||||
ibhs := p.indexBlockHeaders
|
||||
for len(ibhs) > 0 && len(tenantIDs) > 0 {
|
||||
select {
|
||||
case <-stopCh:
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// locate tenantID equal or bigger than the tenantID in ibhs[0]
|
||||
|
@ -541,10 +535,8 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c
|
|||
ibhs := p.indexBlockHeaders
|
||||
|
||||
for len(ibhs) > 0 && len(streamIDs) > 0 {
|
||||
select {
|
||||
case <-stopCh:
|
||||
if needStop(stopCh) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// locate streamID equal or bigger than the streamID in ibhs[0]
|
||||
|
|
Loading…
Reference in a new issue