2024-06-17 10:13:18 +00:00
|
|
|
package logstorage
|
|
|
|
|
|
|
|
import (
|
2024-10-17 19:19:16 +00:00
|
|
|
"container/heap"
|
2024-06-17 10:13:18 +00:00
|
|
|
"fmt"
|
|
|
|
"slices"
|
|
|
|
"sort"
|
2024-10-29 14:37:07 +00:00
|
|
|
"strconv"
|
2025-01-13 19:41:27 +00:00
|
|
|
"strings"
|
2024-10-17 19:19:16 +00:00
|
|
|
"sync"
|
2024-06-17 10:13:18 +00:00
|
|
|
"sync/atomic"
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
|
|
|
)
|
|
|
|
|
|
|
|
// pipeTopDefaultLimit is the default number of entries pipeTop returns.
|
|
|
|
const pipeTopDefaultLimit = 10
|
|
|
|
|
|
|
|
// pipeTop processes '| top ...' queries.
|
|
|
|
//
|
|
|
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe
|
|
|
|
type pipeTop struct {
|
|
|
|
// fields contains field names for returning top values for.
|
|
|
|
byFields []string
|
|
|
|
|
|
|
|
// limit is the number of top (byFields) sets to return.
|
|
|
|
limit uint64
|
|
|
|
|
|
|
|
// limitStr is string representation of the limit.
|
|
|
|
limitStr string
|
|
|
|
|
2024-10-29 14:37:07 +00:00
|
|
|
// the number of hits per each unique value is returned in this field.
|
2024-06-17 10:13:18 +00:00
|
|
|
hitsFieldName string
|
2024-10-29 14:37:07 +00:00
|
|
|
|
|
|
|
// if rankFieldName isn't empty, then the rank per each unique value is returned in this field.
|
|
|
|
rankFieldName string
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (pt *pipeTop) String() string {
|
|
|
|
s := "top"
|
|
|
|
if pt.limit != pipeTopDefaultLimit {
|
|
|
|
s += " " + pt.limitStr
|
|
|
|
}
|
|
|
|
if len(pt.byFields) > 0 {
|
|
|
|
s += " by (" + fieldNamesString(pt.byFields) + ")"
|
|
|
|
}
|
2024-12-22 10:23:19 +00:00
|
|
|
if pt.hitsFieldName != "hits" {
|
|
|
|
s += " hits as " + quoteTokenIfNeeded(pt.hitsFieldName)
|
|
|
|
}
|
2024-10-29 14:37:07 +00:00
|
|
|
if pt.rankFieldName != "" {
|
2024-10-29 15:43:07 +00:00
|
|
|
s += rankFieldNameString(pt.rankFieldName)
|
2024-10-29 14:37:07 +00:00
|
|
|
}
|
2024-06-17 10:13:18 +00:00
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
2024-06-27 12:18:42 +00:00
|
|
|
func (pt *pipeTop) canLiveTail() bool {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
|
|
|
neededFields.reset()
|
|
|
|
unneededFields.reset()
|
|
|
|
|
|
|
|
if len(pt.byFields) == 0 {
|
|
|
|
neededFields.add("*")
|
|
|
|
} else {
|
|
|
|
neededFields.addFields(pt.byFields)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (pt *pipeTop) hasFilterInWithQuery() bool {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2024-12-22 12:09:42 +00:00
|
|
|
func (pt *pipeTop) initFilterInValues(_ *inValuesCache, _ getFieldValuesFunc) (pipe, error) {
|
2024-06-17 10:13:18 +00:00
|
|
|
return pt, nil
|
|
|
|
}
|
|
|
|
|
2025-01-24 17:49:20 +00:00
|
|
|
func (pt *pipeTop) visitSubqueries(_ func(q *Query)) {
|
|
|
|
// nothing to do
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
|
2025-01-15 18:03:07 +00:00
|
|
|
maxStateSize := int64(float64(memory.Allowed()) * 0.4)
|
2024-06-17 10:13:18 +00:00
|
|
|
|
|
|
|
shards := make([]pipeTopProcessorShard, workersCount)
|
|
|
|
for i := range shards {
|
|
|
|
shards[i] = pipeTopProcessorShard{
|
|
|
|
pipeTopProcessorShardNopad: pipeTopProcessorShardNopad{
|
2024-09-29 08:16:14 +00:00
|
|
|
pt: pt,
|
2024-06-17 10:13:18 +00:00
|
|
|
},
|
|
|
|
}
|
2025-02-06 12:42:38 +00:00
|
|
|
shards[i].m.init(uint(workersCount), &shards[i].stateSizeBudget)
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
ptp := &pipeTopProcessor{
|
|
|
|
pt: pt,
|
|
|
|
stopCh: stopCh,
|
|
|
|
cancel: cancel,
|
|
|
|
ppNext: ppNext,
|
|
|
|
|
|
|
|
shards: shards,
|
|
|
|
|
|
|
|
maxStateSize: maxStateSize,
|
|
|
|
}
|
|
|
|
ptp.stateSizeBudget.Store(maxStateSize)
|
|
|
|
|
|
|
|
return ptp
|
|
|
|
}
|
|
|
|
|
|
|
|
type pipeTopProcessor struct {
|
|
|
|
pt *pipeTop
|
|
|
|
stopCh <-chan struct{}
|
|
|
|
cancel func()
|
|
|
|
ppNext pipeProcessor
|
|
|
|
|
|
|
|
shards []pipeTopProcessorShard
|
|
|
|
|
|
|
|
maxStateSize int64
|
|
|
|
stateSizeBudget atomic.Int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type pipeTopProcessorShard struct {
|
|
|
|
pipeTopProcessorShardNopad
|
|
|
|
|
|
|
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
|
|
|
_ [128 - unsafe.Sizeof(pipeTopProcessorShardNopad{})%128]byte
|
|
|
|
}
|
|
|
|
|
|
|
|
type pipeTopProcessorShardNopad struct {
|
|
|
|
// pt points to the parent pipeTop.
|
|
|
|
pt *pipeTop
|
|
|
|
|
2025-01-13 19:41:27 +00:00
|
|
|
// m holds per-value hits.
|
2025-02-06 12:42:38 +00:00
|
|
|
m hitsMapAdaptive
|
2024-06-17 10:13:18 +00:00
|
|
|
|
|
|
|
// keyBuf is a temporary buffer for building keys for m.
|
|
|
|
keyBuf []byte
|
|
|
|
|
|
|
|
// columnValues is a temporary buffer for the processed column values.
|
|
|
|
columnValues [][]string
|
|
|
|
|
|
|
|
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
|
|
|
// The per-shard budget is provided in chunks from the parent pipeTopProcessor.
|
|
|
|
stateSizeBudget int
|
|
|
|
}
|
|
|
|
|
|
|
|
// writeBlock writes br to shard.
|
|
|
|
func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
|
|
|
byFields := shard.pt.byFields
|
|
|
|
if len(byFields) == 0 {
|
|
|
|
// Take into account all the columns in br.
|
|
|
|
keyBuf := shard.keyBuf
|
|
|
|
cs := br.getColumns()
|
2025-01-13 19:41:27 +00:00
|
|
|
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
|
2024-06-17 10:13:18 +00:00
|
|
|
keyBuf = keyBuf[:0]
|
|
|
|
for _, c := range cs {
|
2025-01-13 19:41:27 +00:00
|
|
|
v := c.getValueAtRow(br, rowIdx)
|
2024-06-17 10:13:18 +00:00
|
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
|
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
shard.m.updateStateString(keyBuf, 1)
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
shard.keyBuf = keyBuf
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if len(byFields) == 1 {
|
|
|
|
// Fast path for a single field.
|
2025-01-13 17:11:27 +00:00
|
|
|
shard.updateStatsSingleColumn(br, byFields[0])
|
2024-06-17 10:13:18 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Take into account only the selected columns.
|
|
|
|
columnValues := shard.columnValues[:0]
|
|
|
|
for _, f := range byFields {
|
|
|
|
c := br.getColumnByName(f)
|
|
|
|
values := c.getValues(br)
|
|
|
|
columnValues = append(columnValues, values)
|
|
|
|
}
|
|
|
|
shard.columnValues = columnValues
|
|
|
|
|
|
|
|
keyBuf := shard.keyBuf
|
2025-02-11 15:40:27 +00:00
|
|
|
hits := uint64(1)
|
|
|
|
for rowIdx := 1; rowIdx < br.rowsLen; rowIdx++ {
|
|
|
|
if isEqualPrevRow(columnValues, rowIdx) {
|
|
|
|
hits++
|
|
|
|
continue
|
|
|
|
}
|
2024-06-17 10:13:18 +00:00
|
|
|
keyBuf = keyBuf[:0]
|
|
|
|
for _, values := range columnValues {
|
2025-02-11 15:40:27 +00:00
|
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[rowIdx-1]))
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
2025-02-11 15:40:27 +00:00
|
|
|
shard.m.updateStateString(keyBuf, hits)
|
|
|
|
hits = 1
|
|
|
|
}
|
|
|
|
keyBuf = keyBuf[:0]
|
|
|
|
for _, values := range columnValues {
|
|
|
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[len(values)-1]))
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
2025-02-11 15:40:27 +00:00
|
|
|
shard.m.updateStateString(keyBuf, hits)
|
2024-06-17 10:13:18 +00:00
|
|
|
shard.keyBuf = keyBuf
|
|
|
|
}
|
|
|
|
|
2025-02-11 15:40:27 +00:00
|
|
|
func isEqualPrevRow(columnValues [][]string, rowIdx int) bool {
|
|
|
|
if rowIdx == 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
for _, values := range columnValues {
|
|
|
|
if values[rowIdx-1] != values[rowIdx] {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
2025-01-13 17:11:27 +00:00
|
|
|
func (shard *pipeTopProcessorShard) updateStatsSingleColumn(br *blockResult, fieldName string) {
|
|
|
|
c := br.getColumnByName(fieldName)
|
|
|
|
if c.isConst {
|
|
|
|
v := c.valuesEncoded[0]
|
|
|
|
shard.m.updateStateGeneric(v, uint64(br.rowsLen))
|
|
|
|
return
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
switch c.valueType {
|
|
|
|
case valueTypeDict:
|
|
|
|
c.forEachDictValueWithHits(br, shard.m.updateStateGeneric)
|
|
|
|
case valueTypeUint8:
|
|
|
|
values := c.getValuesEncoded(br)
|
2025-02-11 15:40:27 +00:00
|
|
|
hits := uint64(1)
|
|
|
|
for rowIdx := 1; rowIdx < len(values); rowIdx++ {
|
|
|
|
if values[rowIdx-1] == values[rowIdx] {
|
|
|
|
hits++
|
|
|
|
} else {
|
|
|
|
n := uint64(unmarshalUint8(values[rowIdx-1]))
|
|
|
|
shard.m.updateStateUint64(n, hits)
|
|
|
|
hits = 1
|
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
2025-02-11 15:40:27 +00:00
|
|
|
n := uint64(unmarshalUint8(values[len(values)-1]))
|
|
|
|
shard.m.updateStateUint64(n, hits)
|
2025-01-13 17:11:27 +00:00
|
|
|
case valueTypeUint16:
|
|
|
|
values := c.getValuesEncoded(br)
|
|
|
|
for _, v := range values {
|
2025-02-11 15:40:27 +00:00
|
|
|
n := uint64(unmarshalUint16(v))
|
|
|
|
shard.m.updateStateUint64(n, 1)
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
|
|
|
case valueTypeUint32:
|
|
|
|
values := c.getValuesEncoded(br)
|
|
|
|
for _, v := range values {
|
2025-02-11 15:40:27 +00:00
|
|
|
n := uint64(unmarshalUint32(v))
|
|
|
|
shard.m.updateStateUint64(n, 1)
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
|
|
|
case valueTypeUint64:
|
|
|
|
values := c.getValuesEncoded(br)
|
|
|
|
for _, v := range values {
|
|
|
|
n := unmarshalUint64(v)
|
|
|
|
shard.m.updateStateUint64(n, 1)
|
|
|
|
}
|
|
|
|
case valueTypeInt64:
|
|
|
|
values := c.getValuesEncoded(br)
|
|
|
|
for _, v := range values {
|
|
|
|
n := unmarshalInt64(v)
|
|
|
|
shard.m.updateStateInt64(n, 1)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
values := c.getValues(br)
|
2025-02-11 15:40:27 +00:00
|
|
|
hits := uint64(1)
|
|
|
|
for rowIdx := 1; rowIdx < len(values); rowIdx++ {
|
|
|
|
if values[rowIdx-1] == values[rowIdx] {
|
|
|
|
hits++
|
|
|
|
} else {
|
|
|
|
shard.m.updateStateGeneric(values[rowIdx-1], hits)
|
|
|
|
hits = 1
|
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
2025-02-11 15:40:27 +00:00
|
|
|
shard.m.updateStateGeneric(values[len(values)-1], hits)
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) {
|
2024-09-25 14:16:53 +00:00
|
|
|
if br.rowsLen == 0 {
|
2024-06-17 10:13:18 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
shard := &ptp.shards[workerID]
|
|
|
|
|
|
|
|
for shard.stateSizeBudget < 0 {
|
|
|
|
// steal some budget for the state size from the global budget.
|
|
|
|
remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk)
|
|
|
|
if remaining < 0 {
|
|
|
|
// The state size is too big. Stop processing data in order to avoid OOM crash.
|
|
|
|
if remaining+stateSizeBudgetChunk >= 0 {
|
|
|
|
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
|
|
|
|
ptp.cancel()
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
shard.stateSizeBudget += stateSizeBudgetChunk
|
|
|
|
}
|
|
|
|
|
|
|
|
shard.writeBlock(br)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ptp *pipeTopProcessor) flush() error {
|
|
|
|
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
|
|
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
|
|
|
}
|
|
|
|
|
2024-10-17 19:19:16 +00:00
|
|
|
// merge state across shards in parallel
|
2025-01-15 18:03:07 +00:00
|
|
|
entries := ptp.mergeShardsParallel()
|
2024-10-17 19:19:16 +00:00
|
|
|
if needStop(ptp.stopCh) {
|
|
|
|
return nil
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// write result
|
|
|
|
wctx := &pipeTopWriteContext{
|
|
|
|
ptp: ptp,
|
|
|
|
}
|
|
|
|
byFields := ptp.pt.byFields
|
|
|
|
var rowFields []Field
|
|
|
|
|
|
|
|
addHitsField := func(dst []Field, hits uint64) []Field {
|
|
|
|
hitsStr := string(marshalUint64String(nil, hits))
|
|
|
|
dst = append(dst, Field{
|
|
|
|
Name: ptp.pt.hitsFieldName,
|
|
|
|
Value: hitsStr,
|
|
|
|
})
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
2024-10-29 14:37:07 +00:00
|
|
|
addRankField := func(dst []Field, rank int) []Field {
|
|
|
|
if ptp.pt.rankFieldName == "" {
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
rankStr := strconv.Itoa(rank + 1)
|
|
|
|
dst = append(dst, Field{
|
|
|
|
Name: ptp.pt.rankFieldName,
|
|
|
|
Value: rankStr,
|
|
|
|
})
|
|
|
|
return dst
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
if len(byFields) == 0 {
|
2024-10-29 14:37:07 +00:00
|
|
|
for i, e := range entries {
|
2024-06-17 10:13:18 +00:00
|
|
|
if needStop(ptp.stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
rowFields = rowFields[:0]
|
|
|
|
keyBuf := bytesutil.ToUnsafeBytes(e.k)
|
|
|
|
for len(keyBuf) > 0 {
|
|
|
|
name, nSize := encoding.UnmarshalBytes(keyBuf)
|
|
|
|
if nSize <= 0 {
|
|
|
|
logger.Panicf("BUG: cannot unmarshal field name")
|
|
|
|
}
|
|
|
|
keyBuf = keyBuf[nSize:]
|
|
|
|
|
|
|
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
|
|
|
if nSize <= 0 {
|
|
|
|
logger.Panicf("BUG: cannot unmarshal field value")
|
|
|
|
}
|
|
|
|
keyBuf = keyBuf[nSize:]
|
|
|
|
|
|
|
|
rowFields = append(rowFields, Field{
|
|
|
|
Name: bytesutil.ToUnsafeString(name),
|
|
|
|
Value: bytesutil.ToUnsafeString(value),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
rowFields = addHitsField(rowFields, e.hits)
|
2024-10-29 14:37:07 +00:00
|
|
|
rowFields = addRankField(rowFields, i)
|
2024-06-17 10:13:18 +00:00
|
|
|
wctx.writeRow(rowFields)
|
|
|
|
}
|
|
|
|
} else if len(byFields) == 1 {
|
|
|
|
fieldName := byFields[0]
|
2024-10-29 14:37:07 +00:00
|
|
|
for i, e := range entries {
|
2024-06-17 10:13:18 +00:00
|
|
|
if needStop(ptp.stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
rowFields = append(rowFields[:0], Field{
|
|
|
|
Name: fieldName,
|
|
|
|
Value: e.k,
|
|
|
|
})
|
|
|
|
rowFields = addHitsField(rowFields, e.hits)
|
2024-10-29 14:37:07 +00:00
|
|
|
rowFields = addRankField(rowFields, i)
|
2024-06-17 10:13:18 +00:00
|
|
|
wctx.writeRow(rowFields)
|
|
|
|
}
|
|
|
|
} else {
|
2024-10-29 14:37:07 +00:00
|
|
|
for i, e := range entries {
|
2024-06-17 10:13:18 +00:00
|
|
|
if needStop(ptp.stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
rowFields = rowFields[:0]
|
|
|
|
keyBuf := bytesutil.ToUnsafeBytes(e.k)
|
|
|
|
fieldIdx := 0
|
|
|
|
for len(keyBuf) > 0 {
|
|
|
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
|
|
|
if nSize <= 0 {
|
|
|
|
logger.Panicf("BUG: cannot unmarshal field value")
|
|
|
|
}
|
|
|
|
keyBuf = keyBuf[nSize:]
|
|
|
|
|
|
|
|
rowFields = append(rowFields, Field{
|
|
|
|
Name: byFields[fieldIdx],
|
|
|
|
Value: bytesutil.ToUnsafeString(value),
|
|
|
|
})
|
|
|
|
fieldIdx++
|
|
|
|
}
|
|
|
|
rowFields = addHitsField(rowFields, e.hits)
|
2024-10-29 14:37:07 +00:00
|
|
|
rowFields = addRankField(rowFields, i)
|
2024-06-17 10:13:18 +00:00
|
|
|
wctx.writeRow(rowFields)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
wctx.flush()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2025-01-15 18:03:07 +00:00
|
|
|
func (ptp *pipeTopProcessor) mergeShardsParallel() []*pipeTopEntry {
|
2024-10-17 20:47:52 +00:00
|
|
|
limit := ptp.pt.limit
|
|
|
|
if limit == 0 {
|
2025-01-15 18:03:07 +00:00
|
|
|
return nil
|
2024-10-17 20:47:52 +00:00
|
|
|
}
|
2024-10-17 19:19:16 +00:00
|
|
|
|
2025-02-06 12:42:38 +00:00
|
|
|
hmas := make([]*hitsMapAdaptive, 0, len(ptp.shards))
|
2025-01-15 18:03:07 +00:00
|
|
|
for i := range ptp.shards {
|
2025-02-06 12:42:38 +00:00
|
|
|
hma := &ptp.shards[i].m
|
|
|
|
if hma.entriesCount() > 0 {
|
|
|
|
hmas = append(hmas, hma)
|
2025-01-15 18:03:07 +00:00
|
|
|
}
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
|
|
|
|
2025-01-15 18:03:07 +00:00
|
|
|
var entries []*pipeTopEntry
|
|
|
|
var entriesLock sync.Mutex
|
2025-02-06 12:42:38 +00:00
|
|
|
hitsMapMergeParallel(hmas, ptp.stopCh, func(hm *hitsMap) {
|
2025-01-15 18:03:07 +00:00
|
|
|
es := getTopEntries(hm, limit, ptp.stopCh)
|
|
|
|
entriesLock.Lock()
|
|
|
|
entries = append(entries, es...)
|
|
|
|
entriesLock.Unlock()
|
|
|
|
})
|
2024-10-17 19:19:16 +00:00
|
|
|
if needStop(ptp.stopCh) {
|
2025-01-15 18:03:07 +00:00
|
|
|
return nil
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
sort.Slice(entries, func(i, j int) bool {
|
|
|
|
return entries[j].less(entries[i])
|
|
|
|
})
|
|
|
|
if uint64(len(entries)) > limit {
|
|
|
|
entries = entries[:limit]
|
|
|
|
}
|
2025-01-15 18:03:07 +00:00
|
|
|
|
|
|
|
return entries
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
|
|
|
|
2025-01-15 18:03:07 +00:00
|
|
|
func getTopEntries(hm *hitsMap, limit uint64, stopCh <-chan struct{}) []*pipeTopEntry {
|
2024-10-17 19:19:16 +00:00
|
|
|
if limit == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var eh topEntriesHeap
|
2025-01-13 17:11:27 +00:00
|
|
|
var e pipeTopEntry
|
2024-10-17 19:19:16 +00:00
|
|
|
|
2025-01-13 17:11:27 +00:00
|
|
|
pushEntry := func(k string, hits uint64, kCopy bool) {
|
|
|
|
e.k = k
|
|
|
|
e.hits = hits
|
2024-10-17 19:19:16 +00:00
|
|
|
if uint64(len(eh)) < limit {
|
|
|
|
eCopy := e
|
2025-01-13 17:11:27 +00:00
|
|
|
if kCopy {
|
2025-01-13 19:41:27 +00:00
|
|
|
eCopy.k = strings.Clone(eCopy.k)
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
2024-10-17 19:19:16 +00:00
|
|
|
heap.Push(&eh, &eCopy)
|
2025-01-13 17:11:27 +00:00
|
|
|
return
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
|
|
|
|
if !eh[0].less(&e) {
|
|
|
|
return
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
2025-01-13 17:11:27 +00:00
|
|
|
eCopy := e
|
|
|
|
if kCopy {
|
2025-01-13 19:41:27 +00:00
|
|
|
eCopy.k = strings.Clone(eCopy.k)
|
2025-01-13 17:11:27 +00:00
|
|
|
}
|
|
|
|
eh[0] = &eCopy
|
|
|
|
heap.Fix(&eh, 0)
|
|
|
|
}
|
|
|
|
|
|
|
|
var b []byte
|
2025-01-15 18:03:07 +00:00
|
|
|
for n, pHits := range hm.u64 {
|
2025-01-13 17:11:27 +00:00
|
|
|
if needStop(stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
b = marshalUint64String(b[:0], n)
|
|
|
|
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
|
|
|
|
}
|
2025-01-15 18:03:07 +00:00
|
|
|
for n, pHits := range hm.negative64 {
|
2025-01-13 17:11:27 +00:00
|
|
|
if needStop(stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
b = marshalInt64String(b[:0], int64(n))
|
|
|
|
pushEntry(bytesutil.ToUnsafeString(b), *pHits, true)
|
|
|
|
}
|
2025-01-15 18:03:07 +00:00
|
|
|
for k, pHits := range hm.strings {
|
2025-01-13 17:11:27 +00:00
|
|
|
if needStop(stopCh) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
pushEntry(k, *pHits, false)
|
2024-10-17 19:19:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
result := ([]*pipeTopEntry)(eh)
|
|
|
|
for len(eh) > 0 {
|
|
|
|
x := heap.Pop(&eh)
|
|
|
|
result[len(eh)] = x.(*pipeTopEntry)
|
|
|
|
}
|
|
|
|
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
|
|
|
type topEntriesHeap []*pipeTopEntry
|
|
|
|
|
|
|
|
func (h *topEntriesHeap) Less(i, j int) bool {
|
|
|
|
a := *h
|
|
|
|
return a[i].less(a[j])
|
|
|
|
}
|
|
|
|
func (h *topEntriesHeap) Swap(i, j int) {
|
|
|
|
a := *h
|
|
|
|
a[i], a[j] = a[j], a[i]
|
|
|
|
}
|
|
|
|
func (h *topEntriesHeap) Len() int {
|
|
|
|
return len(*h)
|
|
|
|
}
|
|
|
|
func (h *topEntriesHeap) Push(v any) {
|
|
|
|
x := v.(*pipeTopEntry)
|
|
|
|
*h = append(*h, x)
|
|
|
|
}
|
|
|
|
func (h *topEntriesHeap) Pop() any {
|
|
|
|
a := *h
|
|
|
|
x := a[len(a)-1]
|
|
|
|
a[len(a)-1] = nil
|
|
|
|
*h = a[:len(a)-1]
|
|
|
|
return x
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
type pipeTopEntry struct {
|
|
|
|
k string
|
|
|
|
hits uint64
|
|
|
|
}
|
|
|
|
|
2024-10-17 19:19:16 +00:00
|
|
|
func (e *pipeTopEntry) less(r *pipeTopEntry) bool {
|
|
|
|
if e.hits == r.hits {
|
|
|
|
return e.k > r.k
|
|
|
|
}
|
|
|
|
return e.hits < r.hits
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
type pipeTopWriteContext struct {
|
|
|
|
ptp *pipeTopProcessor
|
|
|
|
rcs []resultColumn
|
|
|
|
br blockResult
|
|
|
|
|
|
|
|
// rowsCount is the number of rows in the current block
|
|
|
|
rowsCount int
|
|
|
|
|
|
|
|
// valuesLen is the total length of values in the current block
|
|
|
|
valuesLen int
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) {
|
|
|
|
rcs := wctx.rcs
|
|
|
|
|
|
|
|
areEqualColumns := len(rcs) == len(rowFields)
|
|
|
|
if areEqualColumns {
|
|
|
|
for i, f := range rowFields {
|
|
|
|
if rcs[i].name != f.Name {
|
|
|
|
areEqualColumns = false
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !areEqualColumns {
|
|
|
|
// send the current block to ppNext and construct a block with new set of columns
|
|
|
|
wctx.flush()
|
|
|
|
|
|
|
|
rcs = wctx.rcs[:0]
|
|
|
|
for _, f := range rowFields {
|
|
|
|
rcs = appendResultColumnWithName(rcs, f.Name)
|
|
|
|
}
|
|
|
|
wctx.rcs = rcs
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, f := range rowFields {
|
|
|
|
v := f.Value
|
|
|
|
rcs[i].addValue(v)
|
|
|
|
wctx.valuesLen += len(v)
|
|
|
|
}
|
|
|
|
|
|
|
|
wctx.rowsCount++
|
2025-01-13 19:41:27 +00:00
|
|
|
|
|
|
|
// The 64_000 limit provides the best performance results.
|
|
|
|
if wctx.valuesLen >= 64_000 {
|
2024-06-17 10:13:18 +00:00
|
|
|
wctx.flush()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (wctx *pipeTopWriteContext) flush() {
|
|
|
|
rcs := wctx.rcs
|
|
|
|
br := &wctx.br
|
|
|
|
|
|
|
|
wctx.valuesLen = 0
|
|
|
|
|
|
|
|
// Flush rcs to ppNext
|
|
|
|
br.setResultColumns(rcs, wctx.rowsCount)
|
|
|
|
wctx.rowsCount = 0
|
|
|
|
wctx.ptp.ppNext.writeBlock(0, br)
|
|
|
|
br.reset()
|
|
|
|
for i := range rcs {
|
|
|
|
rcs[i].resetValues()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-12-06 00:23:11 +00:00
|
|
|
func parsePipeTop(lex *lexer) (pipe, error) {
|
2024-06-17 10:13:18 +00:00
|
|
|
if !lex.isKeyword("top") {
|
|
|
|
return nil, fmt.Errorf("expecting 'top'; got %q", lex.token)
|
|
|
|
}
|
|
|
|
lex.nextToken()
|
|
|
|
|
|
|
|
limit := uint64(pipeTopDefaultLimit)
|
|
|
|
limitStr := ""
|
|
|
|
if isNumberPrefix(lex.token) {
|
|
|
|
limitF, s, err := parseNumber(lex)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("cannot parse N in 'top': %w", err)
|
|
|
|
}
|
|
|
|
if limitF < 1 {
|
|
|
|
return nil, fmt.Errorf("N in 'top %s' must be integer bigger than 0", s)
|
|
|
|
}
|
|
|
|
limit = uint64(limitF)
|
|
|
|
limitStr = s
|
|
|
|
}
|
|
|
|
|
2025-02-20 21:36:05 +00:00
|
|
|
needFields := false
|
|
|
|
if lex.isKeyword("by") {
|
|
|
|
lex.nextToken()
|
|
|
|
needFields = true
|
|
|
|
}
|
|
|
|
|
2024-06-17 10:13:18 +00:00
|
|
|
var byFields []string
|
2025-02-20 21:36:05 +00:00
|
|
|
if lex.isKeyword("(") {
|
2024-06-17 10:13:18 +00:00
|
|
|
bfs, err := parseFieldNamesInParens(lex)
|
|
|
|
if err != nil {
|
2025-02-20 21:36:05 +00:00
|
|
|
return nil, fmt.Errorf("cannot parse 'by(...)': %w", err)
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
2025-02-20 21:36:05 +00:00
|
|
|
byFields = bfs
|
|
|
|
} else if !lex.isKeyword("hits", "rank", ")", "|", "") {
|
|
|
|
bfs, err := parseCommaSeparatedFields(lex)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("cannot parse 'by ...': %w", err)
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
byFields = bfs
|
2025-02-20 21:36:05 +00:00
|
|
|
} else if needFields {
|
|
|
|
return nil, fmt.Errorf("missing fields after 'by'")
|
|
|
|
}
|
|
|
|
if slices.Contains(byFields, "*") {
|
|
|
|
byFields = nil
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
pt := &pipeTop{
|
|
|
|
byFields: byFields,
|
|
|
|
limit: limit,
|
|
|
|
limitStr: limitStr,
|
2025-01-13 19:41:27 +00:00
|
|
|
hitsFieldName: "hits",
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|
|
|
|
|
2025-01-13 19:41:27 +00:00
|
|
|
for {
|
|
|
|
switch {
|
|
|
|
case lex.isKeyword("hits"):
|
|
|
|
lex.nextToken()
|
|
|
|
if lex.isKeyword("as") {
|
|
|
|
lex.nextToken()
|
|
|
|
}
|
|
|
|
s, err := getCompoundToken(lex)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("cannot parse 'hits' name: %w", err)
|
|
|
|
}
|
|
|
|
pt.hitsFieldName = s
|
|
|
|
case lex.isKeyword("rank"):
|
|
|
|
rankFieldName, err := parseRankFieldName(lex)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("cannot parse rank field name in [%s]: %w", pt, err)
|
|
|
|
}
|
|
|
|
pt.rankFieldName = rankFieldName
|
|
|
|
for slices.Contains(byFields, pt.rankFieldName) {
|
|
|
|
pt.rankFieldName += "s"
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
for slices.Contains(byFields, pt.hitsFieldName) {
|
|
|
|
pt.hitsFieldName += "s"
|
|
|
|
}
|
|
|
|
return pt, nil
|
2024-10-29 15:43:07 +00:00
|
|
|
}
|
2024-10-29 14:37:07 +00:00
|
|
|
}
|
2024-10-29 15:43:07 +00:00
|
|
|
}
|
2024-10-29 14:37:07 +00:00
|
|
|
|
2024-10-29 15:43:07 +00:00
|
|
|
func parseRankFieldName(lex *lexer) (string, error) {
|
2024-10-29 14:37:07 +00:00
|
|
|
if !lex.isKeyword("rank") {
|
2024-10-29 15:43:07 +00:00
|
|
|
return "", fmt.Errorf("unexpected token: %q; want 'rank'", lex.token)
|
2024-10-29 14:37:07 +00:00
|
|
|
}
|
|
|
|
lex.nextToken()
|
2024-10-29 15:43:07 +00:00
|
|
|
|
|
|
|
rankFieldName := "rank"
|
2024-10-29 14:37:07 +00:00
|
|
|
if lex.isKeyword("as") {
|
|
|
|
lex.nextToken()
|
|
|
|
if lex.isKeyword("", "|", ")", "(") {
|
2024-10-29 15:43:07 +00:00
|
|
|
return "", fmt.Errorf("missing rank name")
|
2024-10-29 14:37:07 +00:00
|
|
|
}
|
|
|
|
}
|
2024-10-29 15:43:07 +00:00
|
|
|
if !lex.isKeyword("", "|", ")", "limit") {
|
|
|
|
s, err := getCompoundToken(lex)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
rankFieldName = s
|
2024-10-29 14:37:07 +00:00
|
|
|
}
|
2024-10-29 15:43:07 +00:00
|
|
|
return rankFieldName, nil
|
|
|
|
}
|
2024-10-29 14:37:07 +00:00
|
|
|
|
2024-10-29 15:43:07 +00:00
|
|
|
func rankFieldNameString(rankFieldName string) string {
|
|
|
|
s := " rank"
|
|
|
|
if rankFieldName != "rank" {
|
2024-12-22 10:23:19 +00:00
|
|
|
s += " as " + quoteTokenIfNeeded(rankFieldName)
|
2024-10-29 15:43:07 +00:00
|
|
|
}
|
|
|
|
return s
|
2024-06-17 10:13:18 +00:00
|
|
|
}
|