From 6b63f65bafcbef4b5e7c164c8ca72cf9dea97ec6 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 7 May 2024 23:35:31 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 8 +- lib/logstorage/block_result.go | 35 ++-- lib/logstorage/parser.go | 29 ++-- lib/logstorage/parser_test.go | 8 +- lib/logstorage/pipe_sort.go | 291 +++++++++++++++++++++------------ 5 files changed, 233 insertions(+), 138 deletions(-) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index d31bc6087..bfd4b0c59 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1182,12 +1182,18 @@ and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/ _time:5m | sort by (_stream, _time) ``` -Add `desc` after the given log field in order to sort in reverse order. For example, the folliwng query sorts log fields in reverse order of `request_duration_seconds` field: +Add `desc` after the given log field in order to sort in reverse order of this field. For example, the following query sorts log fields in reverse order of `request_duration_seconds` field: ```logsql _time:5m | sort by (request_duration_seconds desc) ``` +The reverse order can be applied globally via `desc` keyword after `by(...)` clause: + +```logsql +_time:5m | sort by (foo, bar) desc +``` + Note that sorting of big number of logs can be slow and can consume a lot of additional memory. It is recommended limiting the number of logs before sorting with the following approaches: diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 656cac8f1..31693c8ed 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -94,9 +94,13 @@ func (br *blockResult) cloneValues(values []string) []string { valuesBufLen := len(valuesBuf) for _, v := range values { - bufLen := len(buf) - buf = append(buf, v...) - valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) + if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { + valuesBuf = append(valuesBuf, v) + } else { + bufLen := len(buf) + buf = append(buf, v...) + valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) + } } br.valuesBuf = valuesBuf @@ -149,14 +153,16 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { } func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { + // Add _time column + br.addTimeColumn() + + // Add _stream column if !br.addStreamColumn(bs) { // Skip the current block, since the associated stream tags are missing. br.reset() return } - br.addTimeColumn() - // Add _msg column v := bs.csh.getConstColumnValue("_msg") if v != "" { @@ -246,10 +252,13 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) var dictValues []string appendValue := func(v string) { - bufLen := len(buf) - buf = append(buf, v...) - s := bytesutil.ToUnsafeString(buf[bufLen:]) - valuesBuf = append(valuesBuf, s) + if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { + valuesBuf = append(valuesBuf, v) + } else { + bufLen := len(buf) + buf = append(buf, v...) + valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) + } } switch ch.valueType { @@ -1679,9 +1688,15 @@ func (rc *resultColumn) resetKeepName() { // addValue adds the given values v to rc. func (rc *resultColumn) addValue(v string) { + values := rc.values + if len(values) > 0 && string(v) == values[len(values)-1] { + rc.values = append(rc.values, values[len(values)-1]) + return + } + bufLen := len(rc.buf) rc.buf = append(rc.buf, v...) - rc.values = append(rc.values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) + rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } var nan = math.NaN() diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index e62074eac..9df8a7d2a 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -207,18 +207,19 @@ func (q *Query) getNeededColumns() []string { pipes := q.pipes for i := len(pipes) - 1; i >= 0; i-- { - neededFields, m := pipes[i].getNeededFields() + neededFields, mapping := pipes[i].getNeededFields() neededFields = normalizeFields(neededFields) referredFields := make(map[string]int) - for _, a := range m { - for _, f := range a { + for _, inFields := range mapping { + for _, f := range inFields { referredFields[f]++ } } for k := range dropFields { - for _, f := range m[k] { + inFields := mapping[k] + for _, f := range inFields { referredFields[f]-- } } @@ -228,7 +229,7 @@ func (q *Query) getNeededColumns() []string { } } dropFieldsNext := make(map[string]struct{}) - for k := range m { + for k := range mapping { if k != "*" && referredFields[k] == 0 { dropFieldsNext[k] = struct{}{} } @@ -252,33 +253,27 @@ func (q *Query) getNeededColumns() []string { if len(neededFields) == 0 { input = nil } - if len(input) == 0 { - break - } // transform upper input fields to the current input fields according to the given mapping. - if input[0] != "*" { + if len(input) == 0 || input[0] != "*" { var dst []string for _, f := range input { - if a, ok := m[f]; ok { + if a, ok := mapping[f]; ok { dst = append(dst, a...) } else { dst = append(dst, f) } } - if a, ok := m["*"]; ok { + if a, ok := mapping["*"]; ok { dst = append(dst, a...) } input = normalizeFields(dst) - if len(input) == 0 { - break - } } // intersect neededFields with input - if neededFields[0] != "*" { + if len(neededFields) == 0 || neededFields[0] != "*" { clear(dropFields) - if input[0] == "*" { + if len(input) > 0 && input[0] == "*" { input = neededFields continue } @@ -336,7 +331,7 @@ func ParseQuery(s string) (*Query, error) { q.pipes = pipes if !lex.isEnd() { - return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]", lex.context()) + return nil, fmt.Errorf("unexpected unparsed tail; context: [%s]; tail: [%s]", lex.context(), lex.s) } return q, nil diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 9e5d72764..e5eb544cd 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -929,9 +929,12 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) // sort pipe + f(`* | sort`, `* | sort`) + f(`* | sort desc`, `* | sort desc`) + f(`* | sort by()`, `* | sort`) f(`* | sort bY (foo)`, `* | sort by (foo)`) f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) - f(`* | sort bY (foo, bar,)`, `* | sort by (foo, bar)`) + f(`* | sort bY (foo desc, bar,) desc`, `* | sort by (foo desc, bar) desc`) // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) @@ -1245,13 +1248,12 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats by(bar)`) // invalid sort pipe - f(`foo | sort`) f(`foo | sort bar`) f(`foo | sort by`) f(`foo | sort by(`) - f(`foo | sort by()`) f(`foo | sort by(baz`) f(`foo | sort by(baz,`) + f(`foo | sort by(bar) foo`) } func TestNormalizeFields(t *testing.T) { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index c3e974ba7..8d380ad78 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -5,12 +5,13 @@ import ( "fmt" "math" "sort" + "strconv" "strings" "sync" "sync/atomic" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -20,25 +21,37 @@ import ( type pipeSort struct { // byFields contains field names for sorting from 'by(...)' clause. byFields []*bySortField + + // whether to apply descending order + isDesc bool } func (ps *pipeSort) String() string { - if len(ps.byFields) == 0 { - logger.Panicf("BUG: pipeSort must contain at least a single byField") + s := "sort" + if len(ps.byFields) > 0 { + a := make([]string, len(ps.byFields)) + for i := range ps.byFields { + a[i] = ps.byFields[i].String() + } + s += " by (" + strings.Join(a, ", ") + ")" } - - a := make([]string, len(ps.byFields)) - for i := range ps.byFields { - a[i] = ps.byFields[i].String() + if ps.isDesc { + s += " desc" } - s := "sort by (" + strings.Join(a, ", ") + ")" - return s } func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) { - fields := make([]string, len(ps.byFields)) - for i, bf := range ps.byFields { + byFields := ps.byFields + + if len(byFields) == 0 { + return []string{"*"}, map[string][]string{ + "*": {"*"}, + } + } + + fields := make([]string, len(byFields)) + for i, bf := range byFields { fields[i] = bf.name } m := map[string][]string{ @@ -96,12 +109,6 @@ type pipeSortProcessorShardNopad struct { // ps point to the parent pipeSort. ps *pipeSort - // u64ValuesBuf holds uint64 values parsed from values for speeding up the sorting. - u64ValuesBuf []uint64 - - // f64ValuesBuf holds float64 values parsed from values for speeding up the sorting. - f64ValuesBuf []float64 - // blocks holds all the blocks with logs written to the shard. blocks []sortBlock @@ -135,8 +142,8 @@ type sortBlockByColumn struct { // c contains column data c blockResultColumn - // u64Values contains uint64 numbers parsed from values - u64Values []uint64 + // i64Values contains int64 numbers parsed from values + i64Values []int64 // f64Values contains float64 numbers parsed from values f64Values []float64 @@ -151,11 +158,11 @@ type sortRowRef struct { rowIdx int } -func (c *sortBlockByColumn) getU64ValueAtRow(rowIdx int) uint64 { +func (c *sortBlockByColumn) getI64ValueAtRow(rowIdx int) int64 { if c.c.isConst { - return c.u64Values[0] + return c.i64Values[0] } - return c.u64Values[rowIdx] + return c.i64Values[rowIdx] } func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 { @@ -169,54 +176,101 @@ func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 { func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { // clone br, so it could be owned by shard br = br.clone() + cs := br.getColumns() byFields := shard.ps.byFields + if len(byFields) == 0 { + // Sort by all the columns - // Collect values for columns from byFields. - byColumns := make([]sortBlockByColumn, len(byFields)) - for i, bf := range byFields { - c := br.getColumnByName(bf.name) - bc := &byColumns[i] - bc.c = c - - if c.isTime { - // Do not initialize bc.values, bc.u64Values and bc.f64Values, since they aren't used. - // This saves some memory. - continue + // Generate byColumns + var rc resultColumn + bb := bbPool.Get() + for i := range br.timestamps { + // JSON-encode all the columns per each row into a single string + // and sort rows by the resulting string. + bb.B = bb.B[:0] + for j := range cs { + c := &cs[j] + v := c.getValueAtRow(br, i) + bb.B = marshalJSONKeyValue(bb.B, c.name, v) + bb.B = append(bb.B, ',') + } + rc.addValue(bytesutil.ToUnsafeString(bb.B)) } - if c.isConst { - // Do not initialize bc.values in order to save some memory. - bc.u64Values = shard.createUint64Values(c.encodedValues) - bc.f64Values = shard.createFloat64Values(c.encodedValues) - continue + bbPool.Put(bb) + byColumns := []sortBlockByColumn{ + { + c: blockResultColumn{ + valueType: valueTypeString, + encodedValues: rc.values, + }, + i64Values: make([]int64, len(br.timestamps)), + f64Values: make([]float64, len(br.timestamps)), + }, } + shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0])) - // pre-populate values in order to track better br memory usage - values := c.getValues(br) - bc.u64Values = shard.createUint64Values(values) - bc.f64Values = shard.createFloat64Values(values) - } - shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0])) + // Append br to shard.blocks. + shard.blocks = append(shard.blocks, sortBlock{ + br: br, + byColumns: byColumns, + otherColumns: cs, + }) + } else { + // Collect values for columns from byFields. + byColumns := make([]sortBlockByColumn, len(byFields)) + for i, bf := range byFields { + c := br.getColumnByName(bf.name) + bc := &byColumns[i] + bc.c = c - // Collect values for other columns. - cs := br.getColumns() - otherColumns := make([]blockResultColumn, 0, len(cs)) - for _, c := range cs { - isByField := false - for _, bf := range byFields { - if bf.name == c.name { - isByField = true - break + if c.isTime { + // Do not initialize bc.i64Values and bc.f64Values, since they aren't used. + // This saves some memory. + continue + } + if c.isConst { + bc.i64Values = shard.createInt64Values(c.encodedValues) + bc.f64Values = shard.createFloat64Values(c.encodedValues) + continue + } + + // pre-populate values in order to track better br memory usage + values := c.getValues(br) + bc.i64Values = shard.createInt64Values(values) + bc.f64Values = shard.createFloat64Values(values) + } + shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0])) + + // Collect values for other columns. + otherColumns := make([]blockResultColumn, 0, len(cs)) + for _, c := range cs { + isByField := false + for _, bf := range byFields { + if bf.name == c.name { + isByField = true + break + } + } + if !isByField { + otherColumns = append(otherColumns, c) } } - if !isByField { - otherColumns = append(otherColumns, c) - } + shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0])) + + // Append br to shard.blocks. + shard.blocks = append(shard.blocks, sortBlock{ + br: br, + byColumns: byColumns, + otherColumns: otherColumns, + }) } - shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0])) + + shard.stateSizeBudget -= br.sizeBytes() + shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0])) // Add row references to rowRefs. - blockIdx := len(shard.blocks) + blockIdx := len(shard.blocks) - 1 rowRefs := shard.rowRefs rowRefsLen := len(rowRefs) for i := range br.timestamps { @@ -227,53 +281,40 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { } shard.rowRefs = rowRefs shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0])) - - // Append br to shard.blocks. - shard.blocks = append(shard.blocks, sortBlock{ - br: br, - byColumns: byColumns, - otherColumns: otherColumns, - }) - shard.stateSizeBudget -= br.sizeBytes() - shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0])) } -func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 { - u64ValuesBuf := shard.u64ValuesBuf - u64ValuesBufLen := len(u64ValuesBuf) - for _, v := range values { - u64, ok := tryParseUint64(v) +func (shard *pipeSortProcessorShard) createInt64Values(values []string) []int64 { + a := make([]int64, len(values)) + for i, v := range values { + i64, ok := tryParseInt64(v) if ok { - u64ValuesBuf = append(u64ValuesBuf, u64) + a[i] = i64 continue } u32, _ := tryParseIPv4(v) - u64ValuesBuf = append(u64ValuesBuf, uint64(u32)) + a[i] = int64(u32) // Do not try parsing timestamp and duration, since they may be negative. // This breaks sorting. } - shard.u64ValuesBuf = u64ValuesBuf - shard.stateSizeBudget -= (len(u64ValuesBuf) - u64ValuesBufLen) * int(unsafe.Sizeof(u64ValuesBuf[0])) + shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0])) - return u64ValuesBuf[u64ValuesBufLen:] + return a } func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []float64 { - f64ValuesBuf := shard.f64ValuesBuf - f64ValuesBufLen := len(f64ValuesBuf) - for _, v := range values { + a := make([]float64, len(values)) + for i, v := range values { f, ok := tryParseFloat64(v) if !ok { f = nan } - f64ValuesBuf = append(f64ValuesBuf, f) + a[i] = f } - shard.f64ValuesBuf = f64ValuesBuf - shard.stateSizeBudget -= (len(f64ValuesBuf) - f64ValuesBufLen) * int(unsafe.Sizeof(f64ValuesBuf[0])) + shard.stateSizeBudget -= len(a) * int(unsafe.Sizeof(a[0])) - return f64ValuesBuf[f64ValuesBufLen:] + return a } func (psp *pipeSortProcessorShard) Len() int { @@ -435,7 +476,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx } } if !areEqualColumns { - // send the current block to bbBase and construct new columns + // send the current block to bbBase and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] @@ -454,7 +495,7 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx br := b.br byColumns := b.byColumns - for i := range byColumns { + for i := range byFields { v := byColumns[i].c.getValueAtRow(br, rr.rowIdx) rcs[i].addValue(v) wctx.valuesLen += len(v) @@ -532,7 +573,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort for idx := range bA.byColumns { cA := &bA.byColumns[idx] cB := &bB.byColumns[idx] - bf := byFields[idx] + isDesc := len(byFields) > 0 && byFields[idx].isDesc + if shardA.ps.isDesc { + isDesc = !isDesc + } if cA.c.isConst && cB.c.isConst { // Fast path - compare const values @@ -546,7 +590,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if tA == tB { continue } - if bf.isDesc { + if isDesc { return tB < tA } return tA < tB @@ -560,14 +604,14 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort return false } - // Try sorting by uint64 values at first - uA := cA.getU64ValueAtRow(rrA.rowIdx) - uB := cB.getU64ValueAtRow(rrB.rowIdx) + // Try sorting by int64 values at first + uA := cA.getI64ValueAtRow(rrA.rowIdx) + uB := cB.getI64ValueAtRow(rrB.rowIdx) if uA != 0 && uB != 0 { if uA == uB { continue } - if bf.isDesc { + if isDesc { return uB < uA } return uA < uB @@ -580,7 +624,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if fA == fB { continue } - if bf.isDesc { + if isDesc { return fB < fA } return fA < fB @@ -592,7 +636,7 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if sA == sB { continue } - if bf.isDesc { + if isDesc { return sB < sA } return sA < sB @@ -605,19 +649,23 @@ func parsePipeSort(lex *lexer) (*pipeSort, error) { return nil, fmt.Errorf("expecting 'sort'; got %q", lex.token) } lex.nextToken() - if !lex.isKeyword("by") { - return nil, fmt.Errorf("expecting 'by'; got %q", lex.token) - } - lex.nextToken() - bfs, err := parseBySortFields(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'by' clause: %w", err) + + var ps pipeSort + if lex.isKeyword("by") { + lex.nextToken() + bfs, err := parseBySortFields(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by' clause: %w", err) + } + ps.byFields = bfs } - ps := &pipeSort{ - byFields: bfs, + if lex.isKeyword("desc") { + lex.nextToken() + ps.isDesc = true } - return ps, nil + + return &ps, nil } // bySortField represents 'by (...)' part of the pipeSort. @@ -646,9 +694,6 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) { lex.nextToken() if lex.isKeyword(")") { lex.nextToken() - if len(bfs) == 0 { - return nil, fmt.Errorf("sort fields list cannot be empty") - } return bfs, nil } fieldName, err := parseFieldName(lex) @@ -673,3 +718,35 @@ func parseBySortFields(lex *lexer) ([]*bySortField, error) { } } } + +func marshalJSONKeyValue(dst []byte, k, v string) []byte { + dst = strconv.AppendQuote(dst, k) + dst = append(dst, ':') + dst = strconv.AppendQuote(dst, v) + return dst +} + +func tryParseInt64(s string) (int64, bool) { + if len(s) == 0 { + return 0, false + } + + isMinus := s[0] == '-' + if isMinus { + s = s[1:] + } + u64, ok := tryParseUint64(s) + if !ok { + return 0, false + } + if !isMinus { + if u64 > math.MaxInt64 { + return 0, false + } + return int64(u64), true + } + if u64 > -math.MinInt64 { + return 0, false + } + return -int64(u64), true +}