mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
lib/logstorage: add top
pipe, which returns top N field sets with the biggest number of matching logs
This commit is contained in:
parent
f644fa6251
commit
430bebb5f0
8 changed files with 868 additions and 2 deletions
|
@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: add [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe) for returning top N sets of the given fields with the maximum number of matching log entries.
|
||||||
|
|
||||||
## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs)
|
## [v0.19.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.19.0-victorialogs)
|
||||||
|
|
||||||
Released at 2024-06-11
|
Released at 2024-06-11
|
||||||
|
|
|
@ -1265,6 +1265,7 @@ LogsQL supports the following pipes:
|
||||||
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
|
- [`replace_regexp`](#replace_regexp-pipe) updates [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with regular expressions.
|
||||||
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
- [`stats`](#stats-pipe) calculates various stats over the selected logs.
|
- [`stats`](#stats-pipe) calculates various stats over the selected logs.
|
||||||
|
- [`top`](#top-pipe) returns top `N` field sets with the maximum number of matching logs.
|
||||||
- [`uniq`](#uniq-pipe) returns unique log entires.
|
- [`uniq`](#uniq-pipe) returns unique log entires.
|
||||||
- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||||
|
@ -1573,6 +1574,7 @@ If the limit is reached, then the set of returned values is random. Also the num
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
- [`field_names` pipe](#field_names-pipe)
|
- [`field_names` pipe](#field_names-pipe)
|
||||||
|
- [`top` pipe](#top-pipe)
|
||||||
- [`uniq` pipe](#uniq-pipe)
|
- [`uniq` pipe](#uniq-pipe)
|
||||||
|
|
||||||
### fields pipe
|
### fields pipe
|
||||||
|
@ -2139,6 +2141,8 @@ See also:
|
||||||
- [stats pipe functions](#stats-pipe-functions)
|
- [stats pipe functions](#stats-pipe-functions)
|
||||||
- [`math` pipe](#math-pipe)
|
- [`math` pipe](#math-pipe)
|
||||||
- [`sort` pipe](#sort-pipe)
|
- [`sort` pipe](#sort-pipe)
|
||||||
|
- [`uniq` pipe](#uniq-pipe)
|
||||||
|
- [`top` pipe](#top-pipe)
|
||||||
|
|
||||||
|
|
||||||
#### Stats by fields
|
#### Stats by fields
|
||||||
|
@ -2256,9 +2260,41 @@ _time:5m | stats
|
||||||
count() total
|
count() total
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### top pipe
|
||||||
|
|
||||||
|
`| top N by (field1, ..., fieldN)` [pipe](#pipes) returns top `N` sets for `(field1, ..., fieldN)` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
|
with the maximum number of matching log entries.
|
||||||
|
|
||||||
|
For example, the following query returns top 7 [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
|
||||||
|
with the maximum number of log entries over the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | top 7 by (_stream)
|
||||||
|
```
|
||||||
|
|
||||||
|
The `N` is optional. If it is skipped, then top 10 entries are returned. For example, the following query returns top 10 values
|
||||||
|
for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) seen in logs for the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | top by (ip)
|
||||||
|
```
|
||||||
|
|
||||||
|
The `by (...)` part in the `top` [pipe](#pipes) is optional. If it is skipped, then all the log fields are taken into account
|
||||||
|
when determining top field sets. This is useful when the field sets are already limited by other pipes such as [`fields` pipe](#fields-pipe).
|
||||||
|
For example, the following query is equivalent to the previous one:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | fields ip | top
|
||||||
|
```
|
||||||
|
|
||||||
|
See also:
|
||||||
|
|
||||||
|
- [`uniq` pipe](#uniq-pipe)
|
||||||
|
- [`stats` pipe](#stats-pipe)
|
||||||
|
|
||||||
### uniq pipe
|
### uniq pipe
|
||||||
|
|
||||||
`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query
|
`| uniq ...` [pipe](#pipes) returns unique results over the selected logs. For example, the following LogsQL query
|
||||||
returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
over logs for the last 5 minutes:
|
over logs for the last 5 minutes:
|
||||||
|
|
||||||
|
@ -2300,6 +2336,8 @@ _time:5m | uniq (host, path) limit 100
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
- [`uniq_values` stats function](#uniq_values-stats)
|
- [`uniq_values` stats function](#uniq_values-stats)
|
||||||
|
- [`top` pipe](#top-pipe)
|
||||||
|
- [`stats` pipe](#stats-pipe)
|
||||||
|
|
||||||
### unpack_json pipe
|
### unpack_json pipe
|
||||||
|
|
||||||
|
|
|
@ -286,6 +286,12 @@ This query uses the following [LogsQL](https://docs.victoriametrics.com/victoria
|
||||||
- [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order.
|
- [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order.
|
||||||
- [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10.
|
- [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10.
|
||||||
|
|
||||||
|
This query can be simplified into the following one, which uses [`top` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe):
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | top 10 by (_stream)
|
||||||
|
```
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
- [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation)
|
- [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation)
|
||||||
|
|
|
@ -1316,7 +1316,7 @@ func parseNumber(lex *lexer) (float64, string, error) {
|
||||||
return f, s, nil
|
return f, s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0, "", fmt.Errorf("cannot parse %q as float64", s)
|
return 0, s, fmt.Errorf("cannot parse %q as float64", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) {
|
func parseFuncArg(lex *lexer, fieldName string, callback func(args string) (filter, error)) (filter, error) {
|
||||||
|
|
|
@ -214,6 +214,12 @@ func parsePipe(lex *lexer) (pipe, error) {
|
||||||
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
|
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
|
||||||
}
|
}
|
||||||
return ps, nil
|
return ps, nil
|
||||||
|
case lex.isKeyword("top"):
|
||||||
|
pt, err := parsePipeTop(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'top' pipe: %w", err)
|
||||||
|
}
|
||||||
|
return pt, nil
|
||||||
case lex.isKeyword("uniq"):
|
case lex.isKeyword("uniq"):
|
||||||
pu, err := parsePipeUniq(lex)
|
pu, err := parsePipeUniq(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -287,6 +293,7 @@ var pipeNames = func() map[string]struct{} {
|
||||||
"replace_regexp",
|
"replace_regexp",
|
||||||
"sort",
|
"sort",
|
||||||
"stats",
|
"stats",
|
||||||
|
"top",
|
||||||
"uniq",
|
"uniq",
|
||||||
"unpack_json",
|
"unpack_json",
|
||||||
"unpack_logfmt",
|
"unpack_logfmt",
|
||||||
|
|
500
lib/logstorage/pipe_top.go
Normal file
500
lib/logstorage/pipe_top.go
Normal file
|
@ -0,0 +1,500 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pipeTopDefaultLimit is the default number of entries pipeTop returns.
|
||||||
|
const pipeTopDefaultLimit = 10
|
||||||
|
|
||||||
|
// pipeTop processes '| top ...' queries.
|
||||||
|
//
|
||||||
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe
|
||||||
|
type pipeTop struct {
|
||||||
|
// fields contains field names for returning top values for.
|
||||||
|
byFields []string
|
||||||
|
|
||||||
|
// limit is the number of top (byFields) sets to return.
|
||||||
|
limit uint64
|
||||||
|
|
||||||
|
// limitStr is string representation of the limit.
|
||||||
|
limitStr string
|
||||||
|
|
||||||
|
// if hitsFieldName isn't empty, then the number of hits per each unique value is returned in this field.
|
||||||
|
hitsFieldName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) String() string {
|
||||||
|
s := "top"
|
||||||
|
if pt.limit != pipeTopDefaultLimit {
|
||||||
|
s += " " + pt.limitStr
|
||||||
|
}
|
||||||
|
if len(pt.byFields) > 0 {
|
||||||
|
s += " by (" + fieldNamesString(pt.byFields) + ")"
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||||
|
neededFields.reset()
|
||||||
|
unneededFields.reset()
|
||||||
|
|
||||||
|
if len(pt.byFields) == 0 {
|
||||||
|
neededFields.add("*")
|
||||||
|
} else {
|
||||||
|
neededFields.addFields(pt.byFields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) optimize() {
|
||||||
|
// nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) hasFilterInWithQuery() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||||||
|
return pt, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pt *pipeTop) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
|
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||||||
|
|
||||||
|
shards := make([]pipeTopProcessorShard, workersCount)
|
||||||
|
for i := range shards {
|
||||||
|
shards[i] = pipeTopProcessorShard{
|
||||||
|
pipeTopProcessorShardNopad: pipeTopProcessorShardNopad{
|
||||||
|
pt: pt,
|
||||||
|
stateSizeBudget: stateSizeBudgetChunk,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
maxStateSize -= stateSizeBudgetChunk
|
||||||
|
}
|
||||||
|
|
||||||
|
ptp := &pipeTopProcessor{
|
||||||
|
pt: pt,
|
||||||
|
stopCh: stopCh,
|
||||||
|
cancel: cancel,
|
||||||
|
ppNext: ppNext,
|
||||||
|
|
||||||
|
shards: shards,
|
||||||
|
|
||||||
|
maxStateSize: maxStateSize,
|
||||||
|
}
|
||||||
|
ptp.stateSizeBudget.Store(maxStateSize)
|
||||||
|
|
||||||
|
return ptp
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeTopProcessor struct {
|
||||||
|
pt *pipeTop
|
||||||
|
stopCh <-chan struct{}
|
||||||
|
cancel func()
|
||||||
|
ppNext pipeProcessor
|
||||||
|
|
||||||
|
shards []pipeTopProcessorShard
|
||||||
|
|
||||||
|
maxStateSize int64
|
||||||
|
stateSizeBudget atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeTopProcessorShard struct {
|
||||||
|
pipeTopProcessorShardNopad
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof(pipeTopProcessorShardNopad{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeTopProcessorShardNopad struct {
|
||||||
|
// pt points to the parent pipeTop.
|
||||||
|
pt *pipeTop
|
||||||
|
|
||||||
|
// m holds per-row hits.
|
||||||
|
m map[string]*uint64
|
||||||
|
|
||||||
|
// keyBuf is a temporary buffer for building keys for m.
|
||||||
|
keyBuf []byte
|
||||||
|
|
||||||
|
// columnValues is a temporary buffer for the processed column values.
|
||||||
|
columnValues [][]string
|
||||||
|
|
||||||
|
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||||||
|
// The per-shard budget is provided in chunks from the parent pipeTopProcessor.
|
||||||
|
stateSizeBudget int
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeBlock writes br to shard.
|
||||||
|
func (shard *pipeTopProcessorShard) writeBlock(br *blockResult) {
|
||||||
|
byFields := shard.pt.byFields
|
||||||
|
if len(byFields) == 0 {
|
||||||
|
// Take into account all the columns in br.
|
||||||
|
keyBuf := shard.keyBuf
|
||||||
|
cs := br.getColumns()
|
||||||
|
for i := range br.timestamps {
|
||||||
|
keyBuf = keyBuf[:0]
|
||||||
|
for _, c := range cs {
|
||||||
|
v := c.getValueAtRow(br, i)
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
||||||
|
}
|
||||||
|
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
|
||||||
|
}
|
||||||
|
shard.keyBuf = keyBuf
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(byFields) == 1 {
|
||||||
|
// Fast path for a single field.
|
||||||
|
c := br.getColumnByName(byFields[0])
|
||||||
|
if c.isConst {
|
||||||
|
v := c.valuesEncoded[0]
|
||||||
|
shard.updateState(v, uint64(len(br.timestamps)))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if c.valueType == valueTypeDict {
|
||||||
|
a := encoding.GetUint64s(len(c.dictValues))
|
||||||
|
hits := a.A
|
||||||
|
valuesEncoded := c.getValuesEncoded(br)
|
||||||
|
for _, v := range valuesEncoded {
|
||||||
|
idx := unmarshalUint8(v)
|
||||||
|
hits[idx]++
|
||||||
|
}
|
||||||
|
for i, v := range c.dictValues {
|
||||||
|
shard.updateState(v, hits[i])
|
||||||
|
}
|
||||||
|
encoding.PutUint64s(a)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
values := c.getValues(br)
|
||||||
|
for _, v := range values {
|
||||||
|
shard.updateState(v, 1)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take into account only the selected columns.
|
||||||
|
columnValues := shard.columnValues[:0]
|
||||||
|
for _, f := range byFields {
|
||||||
|
c := br.getColumnByName(f)
|
||||||
|
values := c.getValues(br)
|
||||||
|
columnValues = append(columnValues, values)
|
||||||
|
}
|
||||||
|
shard.columnValues = columnValues
|
||||||
|
|
||||||
|
keyBuf := shard.keyBuf
|
||||||
|
for i := range br.timestamps {
|
||||||
|
keyBuf = keyBuf[:0]
|
||||||
|
for _, values := range columnValues {
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
|
||||||
|
}
|
||||||
|
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
|
||||||
|
}
|
||||||
|
shard.keyBuf = keyBuf
|
||||||
|
}
|
||||||
|
|
||||||
|
func (shard *pipeTopProcessorShard) updateState(v string, hits uint64) {
|
||||||
|
m := shard.getM()
|
||||||
|
pHits, ok := m[v]
|
||||||
|
if !ok {
|
||||||
|
vCopy := strings.Clone(v)
|
||||||
|
hits := uint64(0)
|
||||||
|
pHits = &hits
|
||||||
|
m[vCopy] = pHits
|
||||||
|
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits))
|
||||||
|
}
|
||||||
|
*pHits += hits
|
||||||
|
}
|
||||||
|
|
||||||
|
func (shard *pipeTopProcessorShard) getM() map[string]*uint64 {
|
||||||
|
if shard.m == nil {
|
||||||
|
shard.m = make(map[string]*uint64)
|
||||||
|
}
|
||||||
|
return shard.m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ptp *pipeTopProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
|
if len(br.timestamps) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
shard := &ptp.shards[workerID]
|
||||||
|
|
||||||
|
for shard.stateSizeBudget < 0 {
|
||||||
|
// steal some budget for the state size from the global budget.
|
||||||
|
remaining := ptp.stateSizeBudget.Add(-stateSizeBudgetChunk)
|
||||||
|
if remaining < 0 {
|
||||||
|
// The state size is too big. Stop processing data in order to avoid OOM crash.
|
||||||
|
if remaining+stateSizeBudgetChunk >= 0 {
|
||||||
|
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
|
||||||
|
ptp.cancel()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shard.stateSizeBudget += stateSizeBudgetChunk
|
||||||
|
}
|
||||||
|
|
||||||
|
shard.writeBlock(br)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ptp *pipeTopProcessor) flush() error {
|
||||||
|
if n := ptp.stateSizeBudget.Load(); n <= 0 {
|
||||||
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", ptp.pt.String(), ptp.maxStateSize/(1<<20))
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge state across shards
|
||||||
|
shards := ptp.shards
|
||||||
|
m := shards[0].getM()
|
||||||
|
shards = shards[1:]
|
||||||
|
for i := range shards {
|
||||||
|
if needStop(ptp.stopCh) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for k, pHitsSrc := range shards[i].getM() {
|
||||||
|
pHits, ok := m[k]
|
||||||
|
if !ok {
|
||||||
|
m[k] = pHitsSrc
|
||||||
|
} else {
|
||||||
|
*pHits += *pHitsSrc
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// select top entries with the biggest number of hits
|
||||||
|
entries := make([]pipeTopEntry, 0, len(m))
|
||||||
|
for k, pHits := range m {
|
||||||
|
entries = append(entries, pipeTopEntry{
|
||||||
|
k: k,
|
||||||
|
hits: *pHits,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
sort.Slice(entries, func(i, j int) bool {
|
||||||
|
a, b := &entries[i], &entries[j]
|
||||||
|
if a.hits == b.hits {
|
||||||
|
return a.k < b.k
|
||||||
|
}
|
||||||
|
return a.hits > b.hits
|
||||||
|
})
|
||||||
|
if uint64(len(entries)) > ptp.pt.limit {
|
||||||
|
entries = entries[:ptp.pt.limit]
|
||||||
|
}
|
||||||
|
|
||||||
|
// write result
|
||||||
|
wctx := &pipeTopWriteContext{
|
||||||
|
ptp: ptp,
|
||||||
|
}
|
||||||
|
byFields := ptp.pt.byFields
|
||||||
|
var rowFields []Field
|
||||||
|
|
||||||
|
addHitsField := func(dst []Field, hits uint64) []Field {
|
||||||
|
hitsStr := string(marshalUint64String(nil, hits))
|
||||||
|
dst = append(dst, Field{
|
||||||
|
Name: ptp.pt.hitsFieldName,
|
||||||
|
Value: hitsStr,
|
||||||
|
})
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(byFields) == 0 {
|
||||||
|
for _, e := range entries {
|
||||||
|
if needStop(ptp.stopCh) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFields = rowFields[:0]
|
||||||
|
keyBuf := bytesutil.ToUnsafeBytes(e.k)
|
||||||
|
for len(keyBuf) > 0 {
|
||||||
|
name, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if nSize <= 0 {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field name")
|
||||||
|
}
|
||||||
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if nSize <= 0 {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field value")
|
||||||
|
}
|
||||||
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
|
rowFields = append(rowFields, Field{
|
||||||
|
Name: bytesutil.ToUnsafeString(name),
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
rowFields = addHitsField(rowFields, e.hits)
|
||||||
|
wctx.writeRow(rowFields)
|
||||||
|
}
|
||||||
|
} else if len(byFields) == 1 {
|
||||||
|
fieldName := byFields[0]
|
||||||
|
for _, e := range entries {
|
||||||
|
if needStop(ptp.stopCh) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFields = append(rowFields[:0], Field{
|
||||||
|
Name: fieldName,
|
||||||
|
Value: e.k,
|
||||||
|
})
|
||||||
|
rowFields = addHitsField(rowFields, e.hits)
|
||||||
|
wctx.writeRow(rowFields)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for _, e := range entries {
|
||||||
|
if needStop(ptp.stopCh) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFields = rowFields[:0]
|
||||||
|
keyBuf := bytesutil.ToUnsafeBytes(e.k)
|
||||||
|
fieldIdx := 0
|
||||||
|
for len(keyBuf) > 0 {
|
||||||
|
value, nSize := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if nSize <= 0 {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field value")
|
||||||
|
}
|
||||||
|
keyBuf = keyBuf[nSize:]
|
||||||
|
|
||||||
|
rowFields = append(rowFields, Field{
|
||||||
|
Name: byFields[fieldIdx],
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
fieldIdx++
|
||||||
|
}
|
||||||
|
rowFields = addHitsField(rowFields, e.hits)
|
||||||
|
wctx.writeRow(rowFields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wctx.flush()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeTopEntry struct {
|
||||||
|
k string
|
||||||
|
hits uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeTopWriteContext struct {
|
||||||
|
ptp *pipeTopProcessor
|
||||||
|
rcs []resultColumn
|
||||||
|
br blockResult
|
||||||
|
|
||||||
|
// rowsCount is the number of rows in the current block
|
||||||
|
rowsCount int
|
||||||
|
|
||||||
|
// valuesLen is the total length of values in the current block
|
||||||
|
valuesLen int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wctx *pipeTopWriteContext) writeRow(rowFields []Field) {
|
||||||
|
rcs := wctx.rcs
|
||||||
|
|
||||||
|
areEqualColumns := len(rcs) == len(rowFields)
|
||||||
|
if areEqualColumns {
|
||||||
|
for i, f := range rowFields {
|
||||||
|
if rcs[i].name != f.Name {
|
||||||
|
areEqualColumns = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !areEqualColumns {
|
||||||
|
// send the current block to ppNext and construct a block with new set of columns
|
||||||
|
wctx.flush()
|
||||||
|
|
||||||
|
rcs = wctx.rcs[:0]
|
||||||
|
for _, f := range rowFields {
|
||||||
|
rcs = appendResultColumnWithName(rcs, f.Name)
|
||||||
|
}
|
||||||
|
wctx.rcs = rcs
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, f := range rowFields {
|
||||||
|
v := f.Value
|
||||||
|
rcs[i].addValue(v)
|
||||||
|
wctx.valuesLen += len(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
wctx.rowsCount++
|
||||||
|
if wctx.valuesLen >= 1_000_000 {
|
||||||
|
wctx.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wctx *pipeTopWriteContext) flush() {
|
||||||
|
rcs := wctx.rcs
|
||||||
|
br := &wctx.br
|
||||||
|
|
||||||
|
wctx.valuesLen = 0
|
||||||
|
|
||||||
|
// Flush rcs to ppNext
|
||||||
|
br.setResultColumns(rcs, wctx.rowsCount)
|
||||||
|
wctx.rowsCount = 0
|
||||||
|
wctx.ptp.ppNext.writeBlock(0, br)
|
||||||
|
br.reset()
|
||||||
|
for i := range rcs {
|
||||||
|
rcs[i].resetValues()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePipeTop(lex *lexer) (*pipeTop, error) {
|
||||||
|
if !lex.isKeyword("top") {
|
||||||
|
return nil, fmt.Errorf("expecting 'top'; got %q", lex.token)
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
|
||||||
|
limit := uint64(pipeTopDefaultLimit)
|
||||||
|
limitStr := ""
|
||||||
|
if isNumberPrefix(lex.token) {
|
||||||
|
limitF, s, err := parseNumber(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse N in 'top': %w", err)
|
||||||
|
}
|
||||||
|
if limitF < 1 {
|
||||||
|
return nil, fmt.Errorf("N in 'top %s' must be integer bigger than 0", s)
|
||||||
|
}
|
||||||
|
limit = uint64(limitF)
|
||||||
|
limitStr = s
|
||||||
|
}
|
||||||
|
|
||||||
|
var byFields []string
|
||||||
|
if lex.isKeyword("by", "(") {
|
||||||
|
if lex.isKeyword("by") {
|
||||||
|
lex.nextToken()
|
||||||
|
}
|
||||||
|
bfs, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'by' clause in 'top': %w", err)
|
||||||
|
}
|
||||||
|
if slices.Contains(bfs, "*") {
|
||||||
|
bfs = nil
|
||||||
|
}
|
||||||
|
byFields = bfs
|
||||||
|
}
|
||||||
|
|
||||||
|
hitsFieldName := "hits"
|
||||||
|
for slices.Contains(byFields, hitsFieldName) {
|
||||||
|
hitsFieldName += "s"
|
||||||
|
}
|
||||||
|
|
||||||
|
pt := &pipeTop{
|
||||||
|
byFields: byFields,
|
||||||
|
limit: limit,
|
||||||
|
limitStr: limitStr,
|
||||||
|
hitsFieldName: hitsFieldName,
|
||||||
|
}
|
||||||
|
|
||||||
|
return pt, nil
|
||||||
|
}
|
313
lib/logstorage/pipe_top_test.go
Normal file
313
lib/logstorage/pipe_top_test.go
Normal file
|
@ -0,0 +1,313 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParsePipeTopSuccess(t *testing.T) {
|
||||||
|
f := func(pipeStr string) {
|
||||||
|
t.Helper()
|
||||||
|
expectParsePipeSuccess(t, pipeStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
f(`top`)
|
||||||
|
f(`top 5`)
|
||||||
|
f(`top by (x)`)
|
||||||
|
f(`top 5 by (x)`)
|
||||||
|
f(`top by (x, y)`)
|
||||||
|
f(`top 5 by (x, y)`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParsePipeTopFailure(t *testing.T) {
|
||||||
|
f := func(pipeStr string) {
|
||||||
|
t.Helper()
|
||||||
|
expectParsePipeFailure(t, pipeStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
f(`top 5 foo`)
|
||||||
|
f(`top 5 by`)
|
||||||
|
f(`top 5 by (`)
|
||||||
|
f(`top 5foo`)
|
||||||
|
f(`top foo`)
|
||||||
|
f(`top by`)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPipeTop(t *testing.T) {
|
||||||
|
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||||
|
t.Helper()
|
||||||
|
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
f("top", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
{"hits", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top 1", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (a)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"hits", "3"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (b)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"b", "54"},
|
||||||
|
{"hits", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (hits)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"hits", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"hits", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"hits", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"hits", "3"},
|
||||||
|
{"hitss", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"hits", "54"},
|
||||||
|
{"hitss", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (c)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"c", ""},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"c", "d"},
|
||||||
|
{"hits", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (d)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"d", ""},
|
||||||
|
{"hits", "3"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top by (a, b)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "54"},
|
||||||
|
{"hits", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top 10 by (a, b)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "54"},
|
||||||
|
{"hits", "1"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
f("top 1 by (a, b)", [][]Field{
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `3`},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
{"a", `2`},
|
||||||
|
{"b", `54`},
|
||||||
|
{"c", "d"},
|
||||||
|
},
|
||||||
|
}, [][]Field{
|
||||||
|
{
|
||||||
|
{"a", "2"},
|
||||||
|
{"b", "3"},
|
||||||
|
{"hits", "2"},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPipeTopUpdateNeededFields(t *testing.T) {
|
||||||
|
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// all the needed fields
|
||||||
|
f("top", "*", "", "*", "")
|
||||||
|
f("top by()", "*", "", "*", "")
|
||||||
|
f("top by(*)", "*", "", "*", "")
|
||||||
|
f("top by(f1,f2)", "*", "", "f1,f2", "")
|
||||||
|
f("top by(f1,f2)", "*", "", "f1,f2", "")
|
||||||
|
|
||||||
|
// all the needed fields, unneeded fields do not intersect with src
|
||||||
|
f("top by(s1, s2)", "*", "f1,f2", "s1,s2", "")
|
||||||
|
f("top", "*", "f1,f2", "*", "")
|
||||||
|
|
||||||
|
// all the needed fields, unneeded fields intersect with src
|
||||||
|
f("top by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "")
|
||||||
|
f("top by(*)", "*", "s1,f1,f2", "*", "")
|
||||||
|
f("top by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "")
|
||||||
|
|
||||||
|
// needed fields do not intersect with src
|
||||||
|
f("top by (s1, s2)", "f1,f2", "", "s1,s2", "")
|
||||||
|
|
||||||
|
// needed fields intersect with src
|
||||||
|
f("top by (s1, s2)", "s1,f1,f2", "", "s1,s2", "")
|
||||||
|
f("top by (*)", "s1,f1,f2", "", "*", "")
|
||||||
|
}
|
Loading…
Reference in a new issue