mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
wip
This commit is contained in:
parent
ede41d2039
commit
8f6b1262df
7 changed files with 498 additions and 4 deletions
|
@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
||||||
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe).
|
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe).
|
||||||
* FEATURE: add support for calculating various stats over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details.
|
* FEATURE: add support for calculating various stats over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details.
|
||||||
* FEATURE: add support for sorting the returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe).
|
* FEATURE: add support for sorting the returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe).
|
||||||
|
* FEATURE: add support for returning unique results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe).
|
||||||
* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
|
* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
|
||||||
* FEATURE: add support for copying and renaming the selected log fields. See [these](https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe) and [these](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) docs.
|
* FEATURE: add support for copying and renaming the selected log fields. See [these](https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe) and [these](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) docs.
|
||||||
* FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter).
|
* FEATURE: allow using `_` inside numbers. For example, `score:range[1_000, 5_000_000]` for [`range` filter](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter).
|
||||||
|
|
|
@ -1051,6 +1051,7 @@ LogsQL supports the following pipes:
|
||||||
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
- [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
||||||
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
||||||
- [`stats`](#stats-pipe) calculates various stats over the selected logs.
|
- [`stats`](#stats-pipe) calculates various stats over the selected logs.
|
||||||
|
- [`uniq`](#uniq-pipe) returns unique log entires.
|
||||||
|
|
||||||
### copy pipe
|
### copy pipe
|
||||||
|
|
||||||
|
@ -1206,6 +1207,31 @@ See also:
|
||||||
- [`limit` pipe](#limit-pipe)
|
- [`limit` pipe](#limit-pipe)
|
||||||
- [`offset` pipe](#offset-pipe)
|
- [`offset` pipe](#offset-pipe)
|
||||||
|
|
||||||
|
### uniq pipe
|
||||||
|
|
||||||
|
`| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query
|
||||||
|
returns uniq values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
|
over logs for the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | uniq by (ip)
|
||||||
|
```
|
||||||
|
|
||||||
|
It is possible to specify multiple fields inside `by(...)` clause. In this case all the unique sets for the given fields
|
||||||
|
are returned. For example, the following query returns all the unique `(host, path)` pairs for the logs over the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | uniq by (host, path)
|
||||||
|
```
|
||||||
|
|
||||||
|
Unique entries are stored in memory during query execution. Big number of unique selected entries may require a lot of memory.
|
||||||
|
Sometimes it is enough to return up to `N` unique entries. This can be done by adding `limit N` after `by (...)` clause.
|
||||||
|
This allows limiting memory usage. For example, the following query returns up to 100 unique `(host, path)` pairs for the logs over the last 5 minutes:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | uniq by (host, path) limit 100
|
||||||
|
```
|
||||||
|
|
||||||
### stats pipe
|
### stats pipe
|
||||||
|
|
||||||
`| stats ...` pipe allows calculating various stats over the selected logs. For example, the following LogsQL query
|
`| stats ...` pipe allows calculating various stats over the selected logs. For example, the following LogsQL query
|
||||||
|
|
|
@ -954,6 +954,15 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`)
|
f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`)
|
||||||
f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`)
|
f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`)
|
||||||
|
|
||||||
|
// uniq pipe
|
||||||
|
f(`* | uniq`, `* | uniq`)
|
||||||
|
f(`* | uniq by()`, `* | uniq`)
|
||||||
|
f(`* | uniq by(*)`, `* | uniq`)
|
||||||
|
f(`* | uniq by(foo,*,bar)`, `* | uniq`)
|
||||||
|
f(`* | uniq by(f1,f2)`, `* | uniq by (f1, f2)`)
|
||||||
|
f(`* | uniq by(f1,f2) limit 10`, `* | uniq by (f1, f2) limit 10`)
|
||||||
|
f(`* | uniq limit 10`, `* | uniq limit 10`)
|
||||||
|
|
||||||
// multiple different pipes
|
// multiple different pipes
|
||||||
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
||||||
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
||||||
|
@ -1288,6 +1297,16 @@ func TestParseQueryFailure(t *testing.T) {
|
||||||
f(`foo | sort by(baz`)
|
f(`foo | sort by(baz`)
|
||||||
f(`foo | sort by(baz,`)
|
f(`foo | sort by(baz,`)
|
||||||
f(`foo | sort by(bar) foo`)
|
f(`foo | sort by(bar) foo`)
|
||||||
|
|
||||||
|
// invalid uniq pipe
|
||||||
|
f(`foo | uniq bar`)
|
||||||
|
f(`foo | uniq limit`)
|
||||||
|
f(`foo | uniq by(`)
|
||||||
|
f(`foo | uniq by(a`)
|
||||||
|
f(`foo | uniq by(a,`)
|
||||||
|
f(`foo | uniq by(a) bar`)
|
||||||
|
f(`foo | uniq by(a) limit -10`)
|
||||||
|
f(`foo | uniq by(a) limit foo`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestQueryGetNeededColumns(t *testing.T) {
|
func TestQueryGetNeededColumns(t *testing.T) {
|
||||||
|
@ -1398,6 +1417,12 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
||||||
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``)
|
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f2) r1, count(r1) r2 | fields r2`, `f1,f2,f3,f4`, ``)
|
||||||
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``)
|
f(`* | stats by(f3,f4) count(f1,f2) r1 | stats count(f3) r1, count(r1) r2 | fields r1`, `f3,f4`, ``)
|
||||||
|
|
||||||
|
f(`* | uniq`, `*`, ``)
|
||||||
|
f(`* | uniq by (f1,f2)`, `f1,f2`, ``)
|
||||||
|
f(`* | uniq by (f1,f2) | fields f1,f3`, `f1,f2`, ``)
|
||||||
|
f(`* | uniq by (f1,f2) | rm f1,f3`, `f1,f2`, ``)
|
||||||
|
f(`* | uniq by (f1,f2) | fields f3`, `f1,f2`, ``)
|
||||||
|
|
||||||
f(`* | rm f1, f2`, `*`, `f1,f2`)
|
f(`* | rm f1, f2`, `*`, `f1,f2`)
|
||||||
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
|
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
|
||||||
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)
|
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)
|
||||||
|
|
|
@ -83,6 +83,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
|
||||||
return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err)
|
return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err)
|
||||||
}
|
}
|
||||||
pipes = append(pipes, ps)
|
pipes = append(pipes, ps)
|
||||||
|
case lex.isKeyword("uniq"):
|
||||||
|
pu, err := parsePipeUniq(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'uniq' pipe: %w", err)
|
||||||
|
}
|
||||||
|
pipes = append(pipes, pu)
|
||||||
case lex.isKeyword("limit", "head"):
|
case lex.isKeyword("limit", "head"):
|
||||||
pl, err := parsePipeLimit(lex)
|
pl, err := parsePipeLimit(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -30,8 +30,8 @@ func (ps *pipeSort) String() string {
|
||||||
s := "sort"
|
s := "sort"
|
||||||
if len(ps.byFields) > 0 {
|
if len(ps.byFields) > 0 {
|
||||||
a := make([]string, len(ps.byFields))
|
a := make([]string, len(ps.byFields))
|
||||||
for i := range ps.byFields {
|
for i, bf := range ps.byFields {
|
||||||
a[i] = ps.byFields[i].String()
|
a[i] = bf.String()
|
||||||
}
|
}
|
||||||
s += " by (" + strings.Join(a, ", ") + ")"
|
s += " by (" + strings.Join(a, ", ") + ")"
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ type pipeSortProcessorShard struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type pipeSortProcessorShardNopad struct {
|
type pipeSortProcessorShardNopad struct {
|
||||||
// ps point to the parent pipeSort.
|
// ps points to the parent pipeSort.
|
||||||
ps *pipeSort
|
ps *pipeSort
|
||||||
|
|
||||||
// blocks holds all the blocks with logs written to the shard.
|
// blocks holds all the blocks with logs written to the shard.
|
||||||
|
@ -165,7 +165,7 @@ func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 {
|
||||||
return c.f64Values[rowIdx]
|
return c.f64Values[rowIdx]
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeBlock writes br with the given byFields to shard.
|
// writeBlock writes br to shard.
|
||||||
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
|
||||||
// clone br, so it could be owned by shard
|
// clone br, so it could be owned by shard
|
||||||
br = br.clone()
|
br = br.clone()
|
||||||
|
|
391
lib/logstorage/pipe_uniq.go
Normal file
391
lib/logstorage/pipe_uniq.go
Normal file
|
@ -0,0 +1,391 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
"sync/atomic"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||||
|
)
|
||||||
|
|
||||||
|
// pipeUniq processes '| uniq ...' queries.
|
||||||
|
//
|
||||||
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe
|
||||||
|
type pipeUniq struct {
|
||||||
|
// fields contains field names for returning unique values
|
||||||
|
byFields []string
|
||||||
|
|
||||||
|
limit uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pu *pipeUniq) String() string {
|
||||||
|
s := "uniq"
|
||||||
|
if len(pu.byFields) > 0 {
|
||||||
|
s += " by (" + fieldNamesString(pu.byFields) + ")"
|
||||||
|
}
|
||||||
|
if pu.limit > 0 {
|
||||||
|
s += fmt.Sprintf(" limit %d", pu.limit)
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pu *pipeUniq) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||||
|
neededFields.reset()
|
||||||
|
unneededFields.reset()
|
||||||
|
|
||||||
|
if len(pu.byFields) == 0 {
|
||||||
|
neededFields.add("*")
|
||||||
|
} else {
|
||||||
|
neededFields.addAll(pu.byFields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||||
|
maxStateSize := int64(float64(memory.Allowed()) * 0.2)
|
||||||
|
|
||||||
|
shards := make([]pipeUniqProcessorShard, workersCount)
|
||||||
|
for i := range shards {
|
||||||
|
shard := &shards[i]
|
||||||
|
shard.pu = pu
|
||||||
|
shard.m = make(map[string]struct{})
|
||||||
|
shard.stateSizeBudget = stateSizeBudgetChunk
|
||||||
|
maxStateSize -= stateSizeBudgetChunk
|
||||||
|
}
|
||||||
|
|
||||||
|
pup := &pipeUniqProcessor{
|
||||||
|
pu: pu,
|
||||||
|
stopCh: stopCh,
|
||||||
|
cancel: cancel,
|
||||||
|
ppBase: ppBase,
|
||||||
|
|
||||||
|
shards: shards,
|
||||||
|
|
||||||
|
maxStateSize: maxStateSize,
|
||||||
|
}
|
||||||
|
pup.stateSizeBudget.Store(maxStateSize)
|
||||||
|
|
||||||
|
return pup
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeUniqProcessor struct {
|
||||||
|
pu *pipeUniq
|
||||||
|
stopCh <-chan struct{}
|
||||||
|
cancel func()
|
||||||
|
ppBase pipeProcessor
|
||||||
|
|
||||||
|
shards []pipeUniqProcessorShard
|
||||||
|
|
||||||
|
maxStateSize int64
|
||||||
|
stateSizeBudget atomic.Int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeUniqProcessorShard struct {
|
||||||
|
pipeUniqProcessorShardNopad
|
||||||
|
|
||||||
|
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||||
|
_ [128 - unsafe.Sizeof(pipeUniqProcessorShardNopad{})%128]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeUniqProcessorShardNopad struct {
|
||||||
|
// pu points to the parent pipeUniq.
|
||||||
|
pu *pipeUniq
|
||||||
|
|
||||||
|
// m holds unique rows.
|
||||||
|
m map[string]struct{}
|
||||||
|
|
||||||
|
// keyBuf is a temporary buffer for building keys for m.
|
||||||
|
keyBuf []byte
|
||||||
|
|
||||||
|
// columnValues is a temporary buffer for the processed column values.
|
||||||
|
columnValues [][]string
|
||||||
|
|
||||||
|
// stateSizeBudget is the remaining budget for the whole state size for the shard.
|
||||||
|
// The per-shard budget is provided in chunks from the parent pipeUniqProcessor.
|
||||||
|
stateSizeBudget int
|
||||||
|
}
|
||||||
|
|
||||||
|
// writeBlock writes br to shard.
|
||||||
|
//
|
||||||
|
// It returns false if the block cannot be written because of the exceeded limit.
|
||||||
|
func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
|
||||||
|
if limit := shard.pu.limit; limit > 0 && uint64(len(shard.m)) >= limit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
m := shard.m
|
||||||
|
byFields := shard.pu.byFields
|
||||||
|
if len(byFields) == 0 {
|
||||||
|
// Take into account all the columns in br.
|
||||||
|
keyBuf := shard.keyBuf
|
||||||
|
for i := range br.timestamps {
|
||||||
|
keyBuf = keyBuf[:0]
|
||||||
|
for _, c := range br.getColumns() {
|
||||||
|
v := c.getValueAtRow(br, i)
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
||||||
|
}
|
||||||
|
if _, ok := m[string(keyBuf)]; !ok {
|
||||||
|
m[string(keyBuf)] = struct{}{}
|
||||||
|
shard.stateSizeBudget -= len(keyBuf) + int(unsafe.Sizeof(""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.keyBuf = keyBuf
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take into account only the selected columns.
|
||||||
|
columnValues := shard.columnValues[:0]
|
||||||
|
for _, f := range byFields {
|
||||||
|
c := br.getColumnByName(f)
|
||||||
|
columnValues = append(columnValues, c.getValues(br))
|
||||||
|
}
|
||||||
|
shard.columnValues = columnValues
|
||||||
|
|
||||||
|
keyBuf := shard.keyBuf
|
||||||
|
for i := range br.timestamps {
|
||||||
|
seenValue := true
|
||||||
|
for _, values := range columnValues {
|
||||||
|
if i == 0 || values[i-1] != values[i] {
|
||||||
|
seenValue = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if seenValue {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
keyBuf = keyBuf[:0]
|
||||||
|
for _, values := range columnValues {
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
|
||||||
|
}
|
||||||
|
if _, ok := m[string(keyBuf)]; !ok {
|
||||||
|
m[string(keyBuf)] = struct{}{}
|
||||||
|
shard.stateSizeBudget -= len(keyBuf) + int(unsafe.Sizeof(""))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
shard.keyBuf = keyBuf
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||||
|
if len(br.timestamps) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
shard := &pup.shards[workerID]
|
||||||
|
|
||||||
|
for shard.stateSizeBudget < 0 {
|
||||||
|
// steal some budget for the state size from the global budget.
|
||||||
|
remaining := pup.stateSizeBudget.Add(-stateSizeBudgetChunk)
|
||||||
|
if remaining < 0 {
|
||||||
|
// The state size is too big. Stop processing data in order to avoid OOM crash.
|
||||||
|
if remaining+stateSizeBudgetChunk >= 0 {
|
||||||
|
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
|
||||||
|
pup.cancel()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shard.stateSizeBudget += stateSizeBudgetChunk
|
||||||
|
}
|
||||||
|
|
||||||
|
if !shard.writeBlock(br) {
|
||||||
|
pup.cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pup *pipeUniqProcessor) flush() error {
|
||||||
|
if n := pup.stateSizeBudget.Load(); n <= 0 {
|
||||||
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pup.pu.String(), pup.maxStateSize/(1<<20))
|
||||||
|
}
|
||||||
|
|
||||||
|
// merge state across shards
|
||||||
|
shards := pup.shards
|
||||||
|
m := shards[0].m
|
||||||
|
shards = shards[1:]
|
||||||
|
for i := range shards {
|
||||||
|
select {
|
||||||
|
case <-pup.stopCh:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
for k := range shards[i].m {
|
||||||
|
m[k] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write result
|
||||||
|
wctx := &pipeUniqWriteContext{
|
||||||
|
pup: pup,
|
||||||
|
}
|
||||||
|
byFields := pup.pu.byFields
|
||||||
|
var rowFields []Field
|
||||||
|
|
||||||
|
if len(byFields) == 0 {
|
||||||
|
for k := range m {
|
||||||
|
select {
|
||||||
|
case <-pup.stopCh:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFields = rowFields[:0]
|
||||||
|
keyBuf := bytesutil.ToUnsafeBytes(k)
|
||||||
|
for len(keyBuf) > 0 {
|
||||||
|
tail, name, err := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field name: %s", err)
|
||||||
|
}
|
||||||
|
keyBuf = tail
|
||||||
|
|
||||||
|
tail, value, err := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
|
||||||
|
}
|
||||||
|
keyBuf = tail
|
||||||
|
|
||||||
|
rowFields = append(rowFields, Field{
|
||||||
|
Name: bytesutil.ToUnsafeString(name),
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wctx.writeRow(rowFields)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for k := range m {
|
||||||
|
select {
|
||||||
|
case <-pup.stopCh:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
rowFields = rowFields[:0]
|
||||||
|
keyBuf := bytesutil.ToUnsafeBytes(k)
|
||||||
|
fieldIdx := 0
|
||||||
|
for len(keyBuf) > 0 {
|
||||||
|
tail, value, err := encoding.UnmarshalBytes(keyBuf)
|
||||||
|
if err != nil {
|
||||||
|
logger.Panicf("BUG: cannot unmarshal field value: %s", err)
|
||||||
|
}
|
||||||
|
keyBuf = tail
|
||||||
|
|
||||||
|
rowFields = append(rowFields, Field{
|
||||||
|
Name: byFields[fieldIdx],
|
||||||
|
Value: bytesutil.ToUnsafeString(value),
|
||||||
|
})
|
||||||
|
fieldIdx++
|
||||||
|
}
|
||||||
|
wctx.writeRow(rowFields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wctx.flush()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type pipeUniqWriteContext struct {
|
||||||
|
pup *pipeUniqProcessor
|
||||||
|
rcs []resultColumn
|
||||||
|
br blockResult
|
||||||
|
|
||||||
|
rowsWritten uint64
|
||||||
|
|
||||||
|
valuesLen int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) {
|
||||||
|
if limit := wctx.pup.pu.limit; limit > 0 && wctx.rowsWritten >= limit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wctx.rowsWritten++
|
||||||
|
|
||||||
|
rcs := wctx.rcs
|
||||||
|
|
||||||
|
areEqualColumns := len(rcs) == len(rowFields)
|
||||||
|
if areEqualColumns {
|
||||||
|
for i, f := range rowFields {
|
||||||
|
if rcs[i].name != f.Name {
|
||||||
|
areEqualColumns = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !areEqualColumns {
|
||||||
|
// send the current block to bbBase and construct a block with new set of columns
|
||||||
|
wctx.flush()
|
||||||
|
|
||||||
|
rcs = wctx.rcs[:0]
|
||||||
|
for _, f := range rowFields {
|
||||||
|
rcs = append(rcs, resultColumn{
|
||||||
|
name: f.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wctx.rcs = rcs
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, f := range rowFields {
|
||||||
|
v := f.Value
|
||||||
|
rcs[i].addValue(v)
|
||||||
|
wctx.valuesLen += len(v)
|
||||||
|
}
|
||||||
|
if wctx.valuesLen >= 1_000_000 {
|
||||||
|
wctx.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wctx *pipeUniqWriteContext) flush() {
|
||||||
|
rcs := wctx.rcs
|
||||||
|
br := &wctx.br
|
||||||
|
|
||||||
|
wctx.valuesLen = 0
|
||||||
|
|
||||||
|
if len(rcs) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush rcs to ppBase
|
||||||
|
br.setResultColumns(rcs)
|
||||||
|
wctx.pup.ppBase.writeBlock(0, br)
|
||||||
|
br.reset()
|
||||||
|
for i := range rcs {
|
||||||
|
rcs[i].resetKeepName()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePipeUniq(lex *lexer) (*pipeUniq, error) {
|
||||||
|
if !lex.isKeyword("uniq") {
|
||||||
|
return nil, fmt.Errorf("expecting 'uniq'; got %q", lex.token)
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
|
||||||
|
var pu pipeUniq
|
||||||
|
if lex.isKeyword("by") {
|
||||||
|
lex.nextToken()
|
||||||
|
bfs, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
|
||||||
|
}
|
||||||
|
if slices.Contains(bfs, "*") {
|
||||||
|
bfs = nil
|
||||||
|
}
|
||||||
|
pu.byFields = bfs
|
||||||
|
}
|
||||||
|
|
||||||
|
if lex.isKeyword("limit") {
|
||||||
|
lex.nextToken()
|
||||||
|
n, ok := tryParseUint64(lex.token)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token)
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
pu.limit = n
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pu, nil
|
||||||
|
}
|
45
lib/logstorage/pipe_uniq_test.go
Normal file
45
lib/logstorage/pipe_uniq_test.go
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPipeUniqUpdateNeededFields(t *testing.T) {
|
||||||
|
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
nfs := newTestFieldsSet(neededFields)
|
||||||
|
unfs := newTestFieldsSet(unneededFields)
|
||||||
|
|
||||||
|
lex := newLexer(s)
|
||||||
|
p, err := parsePipeUniq(lex)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("cannot parse %s: %s", s, err)
|
||||||
|
}
|
||||||
|
p.updateNeededFields(nfs, unfs)
|
||||||
|
|
||||||
|
assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// all the needed fields
|
||||||
|
f("uniq", "*", "", "*", "")
|
||||||
|
f("uniq by()", "*", "", "*", "")
|
||||||
|
f("uniq by(*)", "*", "", "*", "")
|
||||||
|
f("uniq by(f1,f2)", "*", "", "f1,f2", "")
|
||||||
|
|
||||||
|
// all the needed fields, unneeded fields do not intersect with src
|
||||||
|
f("uniq by(s1, s2)", "*", "f1,f2", "s1,s2", "")
|
||||||
|
f("uniq", "*", "f1,f2", "*", "")
|
||||||
|
|
||||||
|
// all the needed fields, unneeded fields intersect with src
|
||||||
|
f("uniq by(s1, s2)", "*", "s1,f1,f2", "s1,s2", "")
|
||||||
|
f("uniq by(*)", "*", "s1,f1,f2", "*", "")
|
||||||
|
f("uniq by(s1, s2)", "*", "s1,s2,f1", "s1,s2", "")
|
||||||
|
|
||||||
|
// needed fields do not intersect with src
|
||||||
|
f("uniq by (s1, s2)", "f1,f2", "", "s1,s2", "")
|
||||||
|
|
||||||
|
// needed fields intersect with src
|
||||||
|
f("uniq by (s1, s2)", "s1,f1,f2", "", "s1,s2", "")
|
||||||
|
f("uniq by (*)", "s1,f1,f2", "", "*", "")
|
||||||
|
}
|
Loading…
Reference in a new issue