This commit is contained in:
Aliaksandr Valialkin 2024-05-07 23:35:31 +02:00
parent 4e9790bc6a
commit 6b63f65baf
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 233 additions and 138 deletions

View file

@ -1182,12 +1182,18 @@ and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/
_time:5m | sort by (_stream, _time)
```
Add `desc` after the given log field in order to sort in reverse order. For example, the folliwng query sorts log fields in reverse order of `request_duration_seconds` field:
Add `desc` after the given log field in order to sort in reverse order of this field. For example, the following query sorts log fields in reverse order of `request_duration_seconds` field:
```logsql
_time:5m | sort by (request_duration_seconds desc)
```
The reverse order can be applied globally via `desc` keyword after `by(...)` clause:
```logsql
_time:5m | sort by (foo, bar) desc
```
Note that sorting of big number of logs can be slow and can consume a lot of additional memory.
It is recommended limiting the number of logs before sorting with the following approaches:

View file

@ -94,9 +94,13 @@ func (br *blockResult) cloneValues(values []string) []string {
valuesBufLen := len(valuesBuf)
for _, v := range values {
bufLen := len(buf)
buf = append(buf, v...)
valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:]))
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
valuesBuf = append(valuesBuf, v)
} else {
bufLen := len(buf)
buf = append(buf, v...)
valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:]))
}
}
br.valuesBuf = valuesBuf
@ -149,14 +153,16 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
// Add _time column
br.addTimeColumn()
// Add _stream column
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
br.reset()
return
}
br.addTimeColumn()
// Add _msg column
v := bs.csh.getConstColumnValue("_msg")
if v != "" {
@ -246,10 +252,13 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap)
var dictValues []string
appendValue := func(v string) {
bufLen := len(buf)
buf = append(buf, v...)
s := bytesutil.ToUnsafeString(buf[bufLen:])
valuesBuf = append(valuesBuf, s)
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
valuesBuf = append(valuesBuf, v)
} else {
bufLen := len(buf)
buf = append(buf, v...)
valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:]))
}
}
switch ch.valueType {
@ -1679,9 +1688,15 @@ func (rc *resultColumn) resetKeepName() {
// addValue adds the given values v to rc.
func (rc *resultColumn) addValue(v string) {
values := rc.values
if len(values) > 0 && string(v) == values[len(values)-1] {
rc.values = append(rc.values, values[len(values)-1])
return
}
bufLen := len(rc.buf)
rc.buf = append(rc.buf, v...)
rc.values = append(rc.values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
}
var nan = math.NaN()

View file

@ -207,18 +207,19 @@ func (q *Query) getNeededColumns() []string {
pipes := q.pipes
for i := len(pipes) - 1; i >= 0; i-- {
neededFields, m := pipes[i].getNeededFields()
neededFields, mapping := pipes[i].getNeededFields()
neededFields = normalizeFields(neededFields)
referredFields := make(map[string]int)
for _, a := range m {
for _, f := range a {
for _, inFields := range mapping {
for _, f := range inFields {
referredFields[f]++
}
}
for k := range dropFields {
for _, f := range m[k] {
inFields := mapping[k]
for _, f := range inFields {
referredFields[f]--
}
}
@ -228,7 +229,7 @@ func (q *Query) getNeededColumns() []string {
}
}
dropFieldsNext := make(map[string]struct{})
for k := range m {
for k := range mapping {
if k != "*" && referredFields[k] == 0 {
dropFieldsNext[k] = struct{}{}
}
@ -252,33 +253,27 @@ func (q *Query) getNeededColumns() []string {
if len(neededFields) == 0 {
input = nil
}
if len(input) == 0 {
break
}
// transform upper input fields to the current input fields according to the given mapping.
if input[0] != "*" {
if len(input) == 0 || input[0] != "*" {
var dst []string
for _, f := range input {
if a, ok := m[f]; ok {
if a, ok := mapping[f]; ok {
dst = append(dst, a...)
} else {
dst = append(dst, f)
}
}
if a, ok := m["*"]; ok {
if a, ok := mapping["*"]; ok {
dst = append(dst, a...)
}
input = normalizeFields(dst)
if len(input) == 0 {
break
}
}
// intersect neededFields with input
if neededFields[0] != "*" {
if len(neededFields) == 0 || neededFields[0] != "*" {
clear(dropFields)
if input[0] == "*" {
if len(input) > 0 && input[0] == "*" {
input = neededFields
continue
}
@ -336,7 +331,7 @@ func ParseQuery(s string) (*Query, error) {
q.pipes = pipes
if !lex.isEnd() {
return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]", lex.context())
return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s)
}
return q, nil

View file

@ -929,9 +929,12 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`)
// sort pipe
f(`* | sort`, `* | sort`)
f(`* | sort desc`, `* | sort desc`)
f(`* | sort by()`, `* | sort`)
f(`* | sort bY (foo)`, `* | sort by (foo)`)
f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`)
f(`* | sort bY (foo, bar,)`, `* | sort by (foo, bar)`)
f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`)
// multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
@ -1245,13 +1248,12 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats by(bar)`)
// invalid sort pipe
f(`foo | sort`)
f(`foo | sort bar`)
f(`foo | sort by`)
f(`foo | sort by(`)
f(`foo | sort by()`)
f(`foo | sort by(baz`)
f(`foo | sort by(baz,`)
f(`foo | sort by(bar) foo`)
}
func TestNormalizeFields(t *testing.T) {

View file

@ -5,12 +5,13 @@ import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
@ -20,25 +21,37 @@ import (
type pipeSort struct {
// byFields contains field names for sorting from 'by(...)' clause.
byFields []*bySortField
// whether to apply descending order
isDesc bool
}
func (ps *pipeSort) String() string {
if len(ps.byFields) == 0 {
logger.Panicf("BUG: pipeSort must contain at least a single byField")
s := "sort"
if len(ps.byFields) > 0 {
a := make([]string, len(ps.byFields))
for i := range ps.byFields {
a[i] = ps.byFields[i].String()
}
s += " by (" + strings.Join(a, ", ") + ")"
}
a := make([]string, len(ps.byFields))
for i := range ps.byFields {
a[i] = ps.byFields[i].String()
if ps.isDesc {
s += " desc"
}
s := "sort by (" + strings.Join(a, ", ") + ")"
return s
}
func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) {
fields := make([]string, len(ps.byFields))
for i, bf := range ps.byFields {
byFields := ps.byFields
if len(byFields) == 0 {
return []string{"*"}, map[string][]string{
"*": {"*"},
}
}
fields := make([]string, len(byFields))
for i, bf := range byFields {
fields[i] = bf.name
}
m := map[string][]string{
@ -96,12 +109,6 @@ type pipeSortProcessorShardNopad struct {
// ps point to the parent pipeSort.
ps *pipeSort
// u64ValuesBuf holds uint64 values parsed from values for speeding up the sorting.
u64ValuesBuf []uint64
// f64ValuesBuf holds float64 values parsed from values for speeding up the sorting.
f64ValuesBuf []float64
// blocks holds all the blocks with logs written to the shard.
blocks []sortBlock
@ -135,8 +142,8 @@ type sortBlockByColumn struct {
// c contains column data
c blockResultColumn
// u64Values contains uint64 numbers parsed from values
u64Values []uint64
// i64Values contains int64 numbers parsed from values
i64Values []int64
// f64Values contains float64 numbers parsed from values
f64Values []float64
@ -151,11 +158,11 @@ type sortRowRef struct {
rowIdx int
}
func (c *sortBlockByColumn) getU64ValueAtRow(rowIdx int) uint64 {
func (c *sortBlockByColumn) getI64ValueAtRow(rowIdx int) int64 {
if c.c.isConst {
return c.u64Values[0]
return c.i64Values[0]
}
return c.u64Values[rowIdx]
return c.i64Values[rowIdx]
}
func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 {
@ -169,54 +176,101 @@ func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 {
func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
// clone br, so it could be owned by shard
br = br.clone()
cs := br.getColumns()
byFields := shard.ps.byFields
if len(byFields) == 0 {
// Sort by all the columns
// Collect values for columns from byFields.
byColumns := make([]sortBlockByColumn, len(byFields))
for i, bf := range byFields {
c := br.getColumnByName(bf.name)
bc := &byColumns[i]
bc.c = c
if c.isTime {
// Do not initialize bc.values, bc.u64Values and bc.f64Values, since they aren't used.
// This saves some memory.
continue
// Generate byColumns
var rc resultColumn
bb := bbPool.Get()
for i := range br.timestamps {
// JSON-encode all the columns per each row into a single string
// and sort rows by the resulting string.
bb.B = bb.B[:0]
for j := range cs {
c := &cs[j]
v := c.getValueAtRow(br, i)
bb.B = marshalJSONKeyValue(bb.B, c.name, v)
bb.B = append(bb.B, ',')
}
rc.addValue(bytesutil.ToUnsafeString(bb.B))
}
if c.isConst {
// Do not initialize bc.values in order to save some memory.
bc.u64Values = shard.createUint64Values(c.encodedValues)
bc.f64Values = shard.createFloat64Values(c.encodedValues)
continue
bbPool.Put(bb)
byColumns := []sortBlockByColumn{
{
c: blockResultColumn{
valueType: valueTypeString,
encodedValues: rc.values,
},
i64Values: make([]int64, len(br.timestamps)),
f64Values: make([]float64, len(br.timestamps)),
},
}
shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]))
// pre-populate values in order to track better br memory usage
values := c.getValues(br)
bc.u64Values = shard.createUint64Values(values)
bc.f64Values = shard.createFloat64Values(values)
}
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
br: br,
byColumns: byColumns,
otherColumns: cs,
})
} else {
// Collect values for columns from byFields.
byColumns := make([]sortBlockByColumn, len(byFields))
for i, bf := range byFields {
c := br.getColumnByName(bf.name)
bc := &byColumns[i]
bc.c = c
// Collect values for other columns.
cs := br.getColumns()
otherColumns := make([]blockResultColumn, 0, len(cs))
for _, c := range cs {
isByField := false
for _, bf := range byFields {
if bf.name == c.name {
isByField = true
break
if c.isTime {
// Do not initialize bc.i64Values and bc.f64Values, since they aren't used.
// This saves some memory.
continue
}
if c.isConst {
bc.i64Values = shard.createInt64Values(c.encodedValues)
bc.f64Values = shard.createFloat64Values(c.encodedValues)
continue
}
// pre-populate values in order to track better br memory usage
values := c.getValues(br)
bc.i64Values = shard.createInt64Values(values)
bc.f64Values = shard.createFloat64Values(values)
}
shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0]))
// Collect values for other columns.
otherColumns := make([]blockResultColumn, 0, len(cs))
for _, c := range cs {
isByField := false
for _, bf := range byFields {
if bf.name == c.name {
isByField = true
break
}
}
if !isByField {
otherColumns = append(otherColumns, c)
}
}
if !isByField {
otherColumns = append(otherColumns, c)
}
shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0]))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
br: br,
byColumns: byColumns,
otherColumns: otherColumns,
})
}
shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0]))
shard.stateSizeBudget -= br.sizeBytes()
shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0]))
// Add row references to rowRefs.
blockIdx := len(shard.blocks)
blockIdx := len(shard.blocks) - 1
rowRefs := shard.rowRefs
rowRefsLen := len(rowRefs)
for i := range br.timestamps {
@ -227,53 +281,40 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
}
shard.rowRefs = rowRefs
shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0]))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{
br: br,
byColumns: byColumns,
otherColumns: otherColumns,
})
shard.stateSizeBudget -= br.sizeBytes()
shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0]))
}
func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 {
u64ValuesBuf := shard.u64ValuesBuf
u64ValuesBufLen := len(u64ValuesBuf)
for _, v := range values {
u64, ok := tryParseUint64(v)
func (shard *pipeSortProcessorShard) createInt64Values(values []string) []int64 {
a := make([]int64, len(values))
for i, v := range values {
i64, ok := tryParseInt64(v)
if ok {
u64ValuesBuf = append(u64ValuesBuf, u64)
a[i] = i64
continue
}
u32, _ := tryParseIPv4(v)
u64ValuesBuf = append(u64ValuesBuf, uint64(u32))
a[i] = int64(u32)
// Do not try parsing timestamp and duration, since they may be negative.
// This breaks sorting.
}
shard.u64ValuesBuf = u64ValuesBuf
shard.stateSizeBudget -= (len(u64ValuesBuf) - u64ValuesBufLen) * int(unsafe.Sizeof(u64ValuesBuf[0]))
shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0]))
return u64ValuesBuf[u64ValuesBufLen:]
return a
}
func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []float64 {
f64ValuesBuf := shard.f64ValuesBuf
f64ValuesBufLen := len(f64ValuesBuf)
for _, v := range values {
a := make([]float64, len(values))
for i, v := range values {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
f64ValuesBuf = append(f64ValuesBuf, f)
a[i] = f
}
shard.f64ValuesBuf = f64ValuesBuf
shard.stateSizeBudget -= (len(f64ValuesBuf) - f64ValuesBufLen) * int(unsafe.Sizeof(f64ValuesBuf[0]))
shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0]))
return f64ValuesBuf[f64ValuesBufLen:]
return a
}
func (psp *pipeSortProcessorShard) Len() int {
@ -435,7 +476,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
}
}
if !areEqualColumns {
// send the current block to bbBase and construct new columns
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
@ -454,7 +495,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx
br := b.br
byColumns := b.byColumns
for i := range byColumns {
for i := range byFields {
v := byColumns[i].c.getValueAtRow(br, rr.rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
@ -532,7 +573,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
for idx := range bA.byColumns {
cA := &bA.byColumns[idx]
cB := &bB.byColumns[idx]
bf := byFields[idx]
isDesc := len(byFields) > 0 && byFields[idx].isDesc
if shardA.ps.isDesc {
isDesc = !isDesc
}
if cA.c.isConst && cB.c.isConst {
// Fast path - compare const values
@ -546,7 +590,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
if tA == tB {
continue
}
if bf.isDesc {
if isDesc {
return tB < tA
}
return tA < tB
@ -560,14 +604,14 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
return false
}
// Try sorting by uint64 values at first
uA := cA.getU64ValueAtRow(rrA.rowIdx)
uB := cB.getU64ValueAtRow(rrB.rowIdx)
// Try sorting by int64 values at first
uA := cA.getI64ValueAtRow(rrA.rowIdx)
uB := cB.getI64ValueAtRow(rrB.rowIdx)
if uA != 0 && uB != 0 {
if uA == uB {
continue
}
if bf.isDesc {
if isDesc {
return uB < uA
}
return uA < uB
@ -580,7 +624,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
if fA == fB {
continue
}
if bf.isDesc {
if isDesc {
return fB < fA
}
return fA < fB
@ -592,7 +636,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
if sA == sB {
continue
}
if bf.isDesc {
if isDesc {
return sB < sA
}
return sA < sB
@ -605,19 +649,23 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) {
return nil, fmt.Errorf("expecting 'sort'; got %q", lex.token)
}
lex.nextToken()
if !lex.isKeyword("by") {
return nil, fmt.Errorf("expecting 'by'; got %q", lex.token)
}
lex.nextToken()
bfs, err := parseBySortFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
var ps pipeSort
if lex.isKeyword("by") {
lex.nextToken()
bfs, err := parseBySortFields(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by' clause: %w", err)
}
ps.byFields = bfs
}
ps := &pipeSort{
byFields: bfs,
if lex.isKeyword("desc") {
lex.nextToken()
ps.isDesc = true
}
return ps, nil
return &ps, nil
}
// bySortField represents 'by (...)' part of the pipeSort.
@ -646,9 +694,6 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) {
lex.nextToken()
if lex.isKeyword(")") {
lex.nextToken()
if len(bfs) == 0 {
return nil, fmt.Errorf("sort fields list cannot be empty")
}
return bfs, nil
}
fieldName, err := parseFieldName(lex)
@ -673,3 +718,35 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) {
}
}
}
func marshalJSONKeyValue(dst []byte, k, v string) []byte {
dst = strconv.AppendQuote(dst, k)
dst = append(dst, ':')
dst = strconv.AppendQuote(dst, v)
return dst
}
func tryParseInt64(s string) (int64, bool) {
if len(s) == 0 {
return 0, false
}
isMinus := s[0] == '-'
if isMinus {
s = s[1:]
}
u64, ok := tryParseUint64(s)
if !ok {
return 0, false
}
if !isMinus {
if u64 > math.MaxInt64 {
return 0, false
}
return int64(u64), true
}
if u64 > -math.MinInt64 {
return 0, false
}
return -int64(u64), true
}