diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index d2b8fc9c5..d31bc6087 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1182,7 +1182,7 @@ and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/ _time:5m | sort by (_stream, _time) ``` -Sorting in reverse order is supported - just add `desc` after the given log field. For example, the folliwng query sorts log fields in reverse order of `request_duration_seconds` field: +Add `desc` after the given log field in order to sort in reverse order. For example, the folliwng query sorts log fields in reverse order of `request_duration_seconds` field: ```logsql _time:5m | sort by (request_duration_seconds desc) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 5dfcfc986..d1396c6ca 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -62,7 +62,8 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { if len(rcs) == 0 { return } - fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) + + br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) cs := br.cs for _, rc := range rcs { @@ -1302,7 +1303,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { encoding.PutFloat64s(a) return max case valueTypeUint8: - max := math.Inf(-1) + max := -inf for _, v := range c.encodedValues { f := float64(v[0]) if f > max { @@ -1311,7 +1312,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { } return max case valueTypeUint16: - max := math.Inf(-1) + max := -inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint16(b)) @@ -1321,7 +1322,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { } return max case valueTypeUint32: - max := math.Inf(-1) + max := -inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint32(b)) @@ -1331,7 +1332,7 @@ func (c *blockResultColumn) getMaxValue(br *blockResult) float64 { } return max case valueTypeUint64: - max := math.Inf(-1) + max := -inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint64(b)) @@ -1399,7 +1400,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { encoding.PutFloat64s(a) return min case valueTypeUint8: - min := math.Inf(1) + min := inf for _, v := range c.encodedValues { f := float64(v[0]) if f < min { @@ -1408,7 +1409,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { } return min case valueTypeUint16: - min := math.Inf(1) + min := inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint16(b)) @@ -1418,7 +1419,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { } return min case valueTypeUint32: - min := math.Inf(1) + min := inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint32(b)) @@ -1428,7 +1429,7 @@ func (c *blockResultColumn) getMinValue(br *blockResult) float64 { } return min case valueTypeUint64: - min := math.Inf(1) + min := inf for _, v := range c.encodedValues { b := bytesutil.ToUnsafeBytes(v) f := float64(encoding.UnmarshalUint64(b)) @@ -1575,3 +1576,4 @@ func (rc *resultColumn) addValue(v string) { } var nan = math.NaN() +var inf = math.Inf(1) diff --git a/lib/logstorage/filter_range_test.go b/lib/logstorage/filter_range_test.go index 5b06009a4..0bda561fb 100644 --- a/lib/logstorage/filter_range_test.go +++ b/lib/logstorage/filter_range_test.go @@ -1,7 +1,6 @@ package logstorage import ( - "math" "testing" ) @@ -429,7 +428,7 @@ func TestFilterRange(t *testing.T) { // match fr := &filterRange{ fieldName: "foo", - minValue: math.Inf(-1), + minValue: -inf, maxValue: 3, } testFilterMatchForColumns(t, columns, fr, "foo", []int{3, 4, 6, 7, 8}) @@ -451,7 +450,7 @@ func TestFilterRange(t *testing.T) { fr = &filterRange{ fieldName: "foo", minValue: 1000, - maxValue: math.Inf(1), + maxValue: inf, } testFilterMatchForColumns(t, columns, fr, "foo", []int{5}) @@ -501,7 +500,7 @@ func TestFilterRange(t *testing.T) { // match fr := &filterRange{ fieldName: "foo", - minValue: math.Inf(-1), + minValue: -inf, maxValue: 3, } testFilterMatchForColumns(t, columns, fr, "foo", []int{3, 4, 6, 7, 8}) @@ -523,7 +522,7 @@ func TestFilterRange(t *testing.T) { fr = &filterRange{ fieldName: "foo", minValue: 1000, - maxValue: math.Inf(1), + maxValue: inf, } testFilterMatchForColumns(t, columns, fr, "foo", []int{5}) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 7d4d16f8d..e1b1fb819 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -820,14 +820,14 @@ func parseFilterRange(lex *lexer, fieldName string) (filter, error) { stringRepr += "[" } else { stringRepr += "(" - minValue = math.Nextafter(minValue, math.Inf(1)) + minValue = math.Nextafter(minValue, inf) } stringRepr += minValueStr + ", " + maxValueStr if includeMaxValue { stringRepr += "]" } else { stringRepr += ")" - maxValue = math.Nextafter(maxValue, math.Inf(-1)) + maxValue = math.Nextafter(maxValue, -inf) } fr := &filterRange{ diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 917b77b19..9e5d72764 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -538,13 +538,13 @@ func TestParseRangeFilter(t *testing.T) { f(`range:range["-1.234e5", "-2e-5"]`, `range`, -1.234e5, -2e-5) f(`_msg:range[1, 2]`, `_msg`, 1, 2) - f(`:range(1, 2)`, ``, math.Nextafter(1, math.Inf(1)), math.Nextafter(2, math.Inf(-1))) - f(`range[1, 2)`, ``, 1, math.Nextafter(2, math.Inf(-1))) - f(`range("1", 2]`, ``, math.Nextafter(1, math.Inf(1)), 2) + f(`:range(1, 2)`, ``, math.Nextafter(1, inf), math.Nextafter(2, -inf)) + f(`range[1, 2)`, ``, 1, math.Nextafter(2, -inf)) + f(`range("1", 2]`, ``, math.Nextafter(1, inf), 2) f(`response_size:range[1KB, 10MiB]`, `response_size`, 1_000, 10*(1<<20)) f(`response_size:range[1G, 10Ti]`, `response_size`, 1_000_000_000, 10*(1<<40)) - f(`response_size:range[10, inf]`, `response_size`, 10, math.Inf(1)) + f(`response_size:range[10, inf]`, `response_size`, 10, inf) f(`duration:range[100ns, 1y2w2.5m3s5ms]`, `duration`, 100, 1*nsecsPerYear+2*nsecsPerWeek+2.5*nsecsPerMinute+3*nsecsPerSecond+5*nsecsPerMillisecond) } @@ -864,8 +864,8 @@ func TestParseQuerySuccess(t *testing.T) { f(`foo | offset 10 | offset 100`, `foo | offset 10 | offset 100`) // stats pipe count - f(`* | STATS bY (foo, b.a/r, "b az") count(*) XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) - f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) + f(`* | STATS bY (foo, b.a/r, "b az",) count(*) XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) + f(`* | stats by() COUNT(x, 'a).b,c|d',) as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) f(`* | stats count() x`, `* | stats count(*) as x`) f(`* | stats count(*) x`, `* | stats count(*) as x`) f(`* | stats count(foo,*,bar) x`, `* | stats count(*) as x`) @@ -928,6 +928,11 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by(_time:1d offset 2h) count() as foo`, `* | stats by (_time:1d offset 2h) count(*) as foo`) f(`* | stats by(_time:1d offset -2.5h5m) count() as foo`, `* | stats by (_time:1d offset -2.5h5m) count(*) as foo`) + // sort pipe + f(`* | sort bY (foo)`, `* | sort by (foo)`) + f(`* | sORt bY (_time, _stream DEsc, host)`, `* | sort by (_time, _stream desc, host)`) + f(`* | sort bY (foo, bar,)`, `* | sort by (foo, bar)`) + // multiple different pipes f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`) @@ -1224,20 +1229,29 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats uniq_array`) f(`foo | stats uniq_array()`) - // invalid grouping fields + // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) f(`foo | stats by(foo:-1h) count() baz`) f(`foo | stats by (foo:1h offset) count() baz`) f(`foo | stats by (foo:1h offset bar) count() baz`) - // invalid by clause + // invalid stats by clause f(`foo | stats by`) f(`foo | stats by bar`) f(`foo | stats by(`) f(`foo | stats by(bar`) f(`foo | stats by(bar,`) f(`foo | stats by(bar)`) + + // invalid sort pipe + f(`foo | sort`) + f(`foo | sort bar`) + f(`foo | sort by`) + f(`foo | sort by(`) + f(`foo | sort by()`) + f(`foo | sort by(baz`) + f(`foo | sort by(baz,`) } func TestNormalizeFields(t *testing.T) { diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 881611284..09503cfce 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -80,6 +80,12 @@ func parsePipes(lex *lexer) ([]pipe, error) { return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) } pipes = append(pipes, ps) + case lex.isKeyword("sort"): + ps, err := parsePipeSort(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'sort' pipe: %w", err) + } + pipes = append(pipes, ps) case lex.isKeyword("limit", "head"): pl, err := parsePipeLimit(lex) if err != nil { diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index cc0ae3d79..6d0c4c459 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -52,6 +52,10 @@ type pipeCopyProcessor struct { } func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + br.copyColumns(pcp.pc.srcFields, pcp.pc.dstFields) pcp.ppBase.writeBlock(workerID, br) } diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index 29d2074c5..5bdf603fa 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -43,6 +43,10 @@ type pipeDeleteProcessor struct { } func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + br.deleteColumns(pdp.pd.fields) pdp.ppBase.writeBlock(workerID, br) } diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index c05e348d4..a1c9b3065 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -45,6 +45,10 @@ type pipeFieldsProcessor struct { } func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + if !pfp.pf.containsStar { br.setColumns(pfp.pf.fields) } diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 6f53975bb..f35af350a 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -41,6 +41,10 @@ type pipeLimitProcessor struct { } func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) if rowsProcessed <= plp.pl.n { // Fast path - write all the rows to ppBase. diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index 6b37a340a..a16620ccc 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -35,6 +35,10 @@ type pipeOffsetProcessor struct { } func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + rowsProcessed := pop.rowsProcessed.Add(uint64(len(br.timestamps))) if rowsProcessed <= pop.po.n { return diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index f98356d74..4b62bd21b 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -57,6 +57,10 @@ type pipeRenameProcessor struct { } func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + br.renameColumns(prp.pr.srcFields, prp.pr.dstFields) prp.ppBase.writeBlock(workerID, br) } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go new file mode 100644 index 000000000..d02ed6763 --- /dev/null +++ b/lib/logstorage/pipe_sort.go @@ -0,0 +1,703 @@ +package logstorage + +import ( + "container/heap" + "fmt" + "math" + "sort" + "strings" + "sync" + "sync/atomic" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" +) + +// pipeSort processes '| sort ...' queries. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe +type pipeSort struct { + // byFields contains field names for sorting from 'by(...)' clause. + byFields []*bySortField +} + +func (ps *pipeSort) String() string { + if len(ps.byFields) == 0 { + logger.Panicf("BUG: pipeSort must contain at least a single byField") + } + + a := make([]string, len(ps.byFields)) + for i := range ps.byFields { + a[i] = ps.byFields[i].String() + } + s := "sort by (" + strings.Join(a, ", ") + ")" + + return s +} + +func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) { + byFields := ps.byFields + neededFields := make([]string, len(byFields)) + for i := range byFields { + neededFields[i] = byFields[i].name + } + return neededFields, nil +} + +func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.3) + + shards := make([]pipeSortProcessorShard, workersCount) + for i := range shards { + shard := &shards[i] + shard.ps = ps + shard.stateSizeBudget = stateSizeBudgetChunk + maxStateSize -= stateSizeBudgetChunk + } + + psp := &pipeSortProcessor{ + ps: ps, + stopCh: stopCh, + cancel: cancel, + ppBase: ppBase, + + shards: shards, + + maxStateSize: maxStateSize, + } + psp.stateSizeBudget.Store(maxStateSize) + + return psp +} + +type pipeSortProcessor struct { + ps *pipeSort + stopCh <-chan struct{} + cancel func() + ppBase pipeProcessor + + shards []pipeSortProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 +} + +type pipeSortProcessorShard struct { + pipeSortProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeSortProcessorShardNopad{})%128]byte +} + +type pipeSortProcessorShardNopad struct { + // ps point to the parent pipeSort. + ps *pipeSort + + // buf holds all the logs data written to the given shard. + buf []byte + + // valuesBuf holds all the string values written to the given shard + // The actual strings are stored in buf. + valuesBuf []string + + // u64ValuesBuf holds uint64 values parsed from valuesBuf for speeding up the sorting. + u64ValuesBuf []uint64 + + // f64ValuesBuf holds float64 values parsed from valuesBuf for speeding up the sorting. + f64ValuesBuf []float64 + + // timestampsBuf holds timestamps if _time columns are used for sorting. + // This speeds up sorting by _time. + timestampsBuf []int64 + + // byColumnsBuf holds `by(...)` columns written to the shard. + byColumnsBuf []sortBlockByColumn + + // otherColumnsBuf holds other than `by(...)` columns written to the shard. + otherColumnsBuf []sortBlockOtherColumn + + // blocks holds all the blocks with logs written to the shard. + blocks []sortBlock + + // rowRefs holds references to all the rows stored in blocks. + // + // Sorting sorts rowRefs, while blocks remain unchanged. + // This should speed up sorting. + rowRefs []sortRowRef + + // rowRefNext points to the next index at rowRefs during merge shards phase + rowRefNext uint + + // stateSizeBudget is the remaining budget for the whole state size for the shard. + // The per-shard budget is provided in chunks from the parent pipeSortProcessor. + stateSizeBudget int +} + +// sortBlock represents a block of logs for sorting. +// +// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. +type sortBlock struct { + // byColumns refers block data for 'by(...)' columns + byColumns []sortBlockByColumn + + // otherColumns refers block data for other than 'by(...)' columns + otherColumns []sortBlockOtherColumn +} + +// sortBlockByColumn represents data for a single column from 'sort by(...)' clause. +// +// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. +type sortBlockByColumn struct { + // values contains column values + values []string + + // u64Values contains uint6464 numbers parsed from values + u64Values []uint64 + + // f64Values contains float64 numbers parsed from values + f64Values []float64 + + // timestamps contains timestamps for blockResultColumn.isTime column + timestamps []int64 +} + +// sortBlockOtherColumn represents data for a single column outside 'sort by(...)' clause. +// +// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. +type sortBlockOtherColumn struct { + // name is the column name + name string + + // values contains column values + values []string +} + +// sortRowRef is the reference to a single log entry written to `sort` pipe. +type sortRowRef struct { + // blockIdx is the index of the block at pipeSortProcessorShard.blocks. + blockIdx uint + + // rowIdx is the index of the log entry inside the block referenced by blockIdx. + rowIdx uint +} + +// writeBlock writes br with the given byFields to shard. +func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { + byFields := shard.ps.byFields + cs := br.getColumns() + + // Collect values for columns from byFields. + byColumnsBuf := shard.byColumnsBuf + byColumnsBufLen := len(byColumnsBuf) + for _, bf := range byFields { + c := br.getColumnByName(bf.name) + values := c.getValues(br) + values = shard.copyValues(values) + u64Values := shard.createUint64Values(values) + f64Values := shard.createFloat64Values(values) + timestamps := shard.createTimestampsIfNeeded(br.timestamps, c.isTime) + byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{ + values: values, + u64Values: u64Values, + f64Values: f64Values, + timestamps: timestamps, + }) + } + shard.byColumnsBuf = byColumnsBuf + byColumns := byColumnsBuf[byColumnsBufLen:] + shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0])) + + // Collect values for other columns. + otherColumnsBuf := shard.otherColumnsBuf + otherColumnsBufLen := len(otherColumnsBuf) + for _, c := range cs { + isByField := false + for _, bf := range byFields { + if bf.name == c.name { + isByField = true + break + } + } + if isByField { + continue + } + + values := c.getValues(br) + values = shard.copyValues(values) + otherColumnsBuf = append(otherColumnsBuf, sortBlockOtherColumn{ + name: c.name, + values: values, + }) + } + shard.otherColumnsBuf = otherColumnsBuf + otherColumns := otherColumnsBuf[otherColumnsBufLen:] + shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0])) + + // Add row references to rowRefs. + blockIdx := uint(len(shard.blocks)) + rowRefs := shard.rowRefs + rowRefsLen := len(rowRefs) + for i := range br.timestamps { + rowRefs = append(rowRefs, sortRowRef{ + blockIdx: blockIdx, + rowIdx: uint(i), + }) + } + shard.rowRefs = rowRefs + shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0])) + + // Add byColumns and otherColumns to blocks. + shard.blocks = append(shard.blocks, sortBlock{ + byColumns: byColumns, + otherColumns: otherColumns, + }) + shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0])) +} + +// copyValues copies values to the shard and returns the copied values. +func (shard *pipeSortProcessorShard) copyValues(values []string) []string { + buf := shard.buf + bufLenOriginal := len(buf) + + valuesBuf := shard.valuesBuf + valuesBufLen := len(valuesBuf) + + for _, v := range values { + bufLen := len(buf) + buf = append(buf, v...) + valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) + } + + shard.valuesBuf = valuesBuf + shard.buf = buf + + shard.stateSizeBudget -= len(buf) - bufLenOriginal + shard.stateSizeBudget -= (len(valuesBuf) - valuesBufLen) * int(unsafe.Sizeof(valuesBuf[0])) + + return valuesBuf[valuesBufLen:] +} + +func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 { + u64ValuesBuf := shard.u64ValuesBuf + u64ValuesBufLen := len(u64ValuesBuf) + for _, v := range values { + u64, ok := tryParseUint64(v) + if ok { + u64ValuesBuf = append(u64ValuesBuf, u64) + } + u32, ok := tryParseIPv4(v) + if ok { + u64ValuesBuf = append(u64ValuesBuf, uint64(u32)) + } + i64, ok := tryParseTimestampRFC3339Nano(v) + if ok { + u64ValuesBuf = append(u64ValuesBuf, uint64(i64)) + } + i64, ok = tryParseDuration(v) + u64ValuesBuf = append(u64ValuesBuf, uint64(i64)) + } + shard.u64ValuesBuf = u64ValuesBuf + + shard.stateSizeBudget -= (len(u64ValuesBuf) - u64ValuesBufLen) * int(unsafe.Sizeof(u64ValuesBuf[0])) + + return u64ValuesBuf[u64ValuesBufLen:] +} + +func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []float64 { + f64ValuesBuf := shard.f64ValuesBuf + f64ValuesBufLen := len(f64ValuesBuf) + for _, v := range values { + f, ok := tryParseFloat64(v) + if !ok { + f = nan + } + f64ValuesBuf = append(f64ValuesBuf, f) + } + shard.f64ValuesBuf = f64ValuesBuf + + shard.stateSizeBudget -= (len(f64ValuesBuf) - f64ValuesBufLen) * int(unsafe.Sizeof(f64ValuesBuf[0])) + + return f64ValuesBuf[f64ValuesBufLen:] +} + +func (shard *pipeSortProcessorShard) createTimestampsIfNeeded(timestamps []int64, isTime bool) []int64 { + if !isTime { + return nil + } + + timestampsBufLen := len(shard.timestampsBuf) + shard.timestampsBuf = append(shard.timestampsBuf, timestamps...) + shard.stateSizeBudget -= (len(timestamps) - timestampsBufLen) * int(unsafe.Sizeof(timestamps[0])) + + return shard.timestampsBuf[timestampsBufLen:] +} + +func (psp *pipeSortProcessorShard) Len() int { + return len(psp.rowRefs) +} + +func (psp *pipeSortProcessorShard) Swap(i, j int) { + rowRefs := psp.rowRefs + rowRefs[i], rowRefs[j] = rowRefs[j], rowRefs[i] +} + +func (psp *pipeSortProcessorShard) Less(i, j int) bool { + return sortBlockLess(psp, uint(i), psp, uint(j)) +} + +func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &psp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + psp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + +func (psp *pipeSortProcessor) flush() error { + if n := psp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) + } + + select { + case <-psp.stopCh: + return nil + default: + } + + // Sort every shard in parallel + var wg sync.WaitGroup + shards := psp.shards + for i := range shards { + wg.Add(1) + go func(shard *pipeSortProcessorShard) { + // TODO: interrupt long sorting when psp.stopCh is closed. + sort.Sort(shard) + wg.Done() + }(&shards[i]) + } + wg.Wait() + + select { + case <-psp.stopCh: + return nil + default: + } + + // Merge sorted results across shards + sh := pipeSortProcessorShardsHeap(make([]*pipeSortProcessorShard, 0, len(shards))) + for i := range shards { + shard := &shards[i] + if shard.Len() > 0 { + sh = append(sh, shard) + } + } + heap.Init(&sh) + + wctx := &pipeSortWriteContext{ + psp: psp, + } + var shardNext *pipeSortProcessorShard + for len(sh) > 1 { + shard := sh[0] + wctx.writeRow(shard, shard.rowRefNext) + shard.rowRefNext++ + + if shard.rowRefNext >= uint(len(shard.rowRefs)) { + _ = heap.Pop(&sh) + shardNext = nil + continue + } + + if shardNext == nil { + shardNext = sh[1] + if len(sh) > 2 && sortBlockLess(sh[2], sh[2].rowRefNext, shardNext, shardNext.rowRefNext) { + shardNext = sh[2] + } + } + + if sortBlockLess(shardNext, shardNext.rowRefNext, shard, shard.rowRefNext) {d + heap.Fix(&sh, 0) + shardNext = nil + + select { + case <-psp.stopCh: + return nil + default: + } + } + } + if len(sh) == 1 { + shard := sh[0] + for shard.rowRefNext < uint(len(shard.rowRefs)) { + wctx.writeRow(shard, shard.rowRefNext) + shard.rowRefNext++ + } + } + wctx.flush() + + return nil +} + +type pipeSortWriteContext struct { + psp *pipeSortProcessor + rcs []resultColumn + br blockResult + valuesLen int +} + +func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx uint) { + rowRef := shard.rowRefs[rowIdx] + block := &shard.blocks[rowRef.blockIdx] + + byFields := shard.ps.byFields + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(byFields)+len(block.otherColumns) + if areEqualColumns { + for i, c := range block.otherColumns { + if rcs[len(byFields)+i].name != c.name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct new columns + wctx.flush() + + rcs = rcs[:0] + for _, bf := range byFields { + rcs = append(rcs, resultColumn{ + name: bf.name, + }) + } + for _, c := range block.otherColumns { + rcs = append(rcs, resultColumn{ + name: c.name, + }) + } + wctx.rcs = rcs + } + + for i, c := range block.byColumns { + v := c.values[rowRef.rowIdx] + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + for i, c := range block.otherColumns { + v := c.values[rowRef.rowIdx] + rcs[len(byFields)+i].addValue(v) + wctx.valuesLen += len(v) + } + + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeSortWriteContext) flush() { + rcs := wctx.rcs + br := &wctx.br + + wctx.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br.setResultColumns(rcs) + wctx.psp.ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].reset() + } +} + +type pipeSortProcessorShardsHeap []*pipeSortProcessorShard + +func (sh *pipeSortProcessorShardsHeap) Len() int { + return len(*sh) +} + +func (sh *pipeSortProcessorShardsHeap) Swap(i, j int) { + a := *sh + a[i], a[j] = a[j], a[i] +} + +func (sh *pipeSortProcessorShardsHeap) Less(i, j int) bool { + a := *sh + shardA := a[i] + shardB := a[j] + return sortBlockLess(shardA, shardA.rowRefNext, shardB, shardB.rowRefNext) +} + +func (sh *pipeSortProcessorShardsHeap) Push(x any) { + shard := x.(*pipeSortProcessorShard) + *sh = append(*sh, shard) +} + +func (sh *pipeSortProcessorShardsHeap) Pop() any { + a := *sh + x := a[len(a)-1] + a[len(a)-1] = nil + *sh = a[:len(a)-1] + return x +} + +func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSortProcessorShard, rowIdxB uint) bool { + byFields := shardA.ps.byFields + + rowRefA := shardA.rowRefs[rowIdxA] + rowRefB := shardB.rowRefs[rowIdxB] + csA := shardA.blocks[rowRefA.blockIdx].byColumns + csB := shardB.blocks[rowRefB.blockIdx].byColumns + for idx := range csA { + cA := &csA[idx] + cB := &csB[idx] + bf := byFields[idx] + + if len(cA.timestamps) > 0 && len(cB.timestamps) > 0 { + // Fast path - sort by _time + tA := cA.timestamps[rowIdxA] + tB := cB.timestamps[rowIdxB] + if tA == tB { + continue + } + if bf.isDesc { + return tB < tA + } + return tA < tB + } + + // Try sorting by uint64 values at first + uA := cA.u64Values[rowIdxA] + uB := cB.u64Values[rowIdxB] + if uA != 0 && uB != 0 { + if uA == uB { + continue + } + if bf.isDesc { + return uB < uA + } + return uA < uB + } + + // Try sorting by float64 then + fA := cA.f64Values[rowIdxA] + fB := cB.f64Values[rowIdxB] + if !math.IsNaN(fA) && !math.IsNaN(fB) { + if fA == fB { + continue + } + if bf.isDesc { + return fB < fA + } + return fA < fB + } + + // Fall back to string sorting + sA := cA.values[rowIdxA] + sB := cB.values[rowIdxB] + if sA == sB { + continue + } + if bf.isDesc { + return sB < sA + } + return sA < sB + } + return false +} + +func parsePipeSort(lex *lexer) (*pipeSort, error) { + if !lex.isKeyword("sort") { + return nil, fmt.Errorf("expecting 'sort'; got %q", lex.token) + } + lex.nextToken() + if !lex.isKeyword("by") { + return nil, fmt.Errorf("expecting 'by'; got %q", lex.token) + } + lex.nextToken() + bfs, err := parseBySortFields(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by' clause: %w", err) + } + + ps := &pipeSort{ + byFields: bfs, + } + return ps, nil +} + +// bySortField represents 'by (...)' part of the pipeSort. +type bySortField struct { + // the name of the field to sort + name string + + // whether the sorting for the given field in descending order + isDesc bool +} + +func (bf *bySortField) String() string { + s := quoteTokenIfNeeded(bf.name) + if bf.isDesc { + s += " desc" + } + return s +} + +func parseBySortFields(lex *lexer) ([]*bySortField, error) { + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing `(`") + } + var bfs []*bySortField + for { + lex.nextToken() + if lex.isKeyword(")") { + lex.nextToken() + if len(bfs) == 0 { + return nil, fmt.Errorf("sort fields list cannot be empty") + } + return bfs, nil + } + fieldName, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse field name: %w", err) + } + bf := &bySortField{ + name: fieldName, + } + if lex.isKeyword("desc") { + lex.nextToken() + bf.isDesc = true + } + bfs = append(bfs, bf) + switch { + case lex.isKeyword(")"): + lex.nextToken() + return bfs, nil + case lex.isKeyword(","): + default: + return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token) + } + } +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 6334469fd..4b8c78e34 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -162,46 +162,9 @@ type pipeStatsProcessorShardNopad struct { stateSizeBudget int } -func (shard *pipeStatsProcessorShard) getStatsProcessors(key []byte) []statsProcessor { - spg := shard.m[string(key)] - if spg == nil { - sfps := make([]statsProcessor, len(shard.ps.funcs)) - for i, f := range shard.ps.funcs { - sfp, stateSize := f.newStatsProcessor() - sfps[i] = sfp - shard.stateSizeBudget -= stateSize - } - spg = &pipeStatsGroup{ - sfps: sfps, - } - shard.m[string(key)] = spg - shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) - } - return spg.sfps -} +func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { + byFields := shard.ps.byFields -type pipeStatsGroup struct { - sfps []statsProcessor -} - -func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { - shard := &psp.shards[workerID] - - for shard.stateSizeBudget < 0 { - // steal some budget for the state size from the global budget. - remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) - if remaining < 0 { - // The state size is too big. Stop processing data in order to avoid OOM crash. - if remaining+stateSizeBudgetChunk >= 0 { - // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. - psp.cancel() - } - return - } - shard.stateSizeBudget += stateSizeBudgetChunk - } - - byFields := psp.ps.byFields if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. for _, sfp := range shard.getStatsProcessors(nil) { @@ -306,6 +269,52 @@ func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { shard.keyBuf = keyBuf } +func (shard *pipeStatsProcessorShard) getStatsProcessors(key []byte) []statsProcessor { + spg := shard.m[string(key)] + if spg == nil { + sfps := make([]statsProcessor, len(shard.ps.funcs)) + for i, f := range shard.ps.funcs { + sfp, stateSize := f.newStatsProcessor() + sfps[i] = sfp + shard.stateSizeBudget -= stateSize + } + spg = &pipeStatsGroup{ + sfps: sfps, + } + shard.m[string(key)] = spg + shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) + } + return spg.sfps +} + +type pipeStatsGroup struct { + sfps []statsProcessor +} + +func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &psp.shards[workerID] + + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := psp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + psp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + + shard.writeBlock(br) +} + func (psp *pipeStatsProcessor) flush() error { if n := psp.stateSizeBudget.Load(); n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", psp.ps.String(), psp.maxStateSize/(1<<20)) @@ -407,6 +416,7 @@ func (psp *pipeStatsProcessor) flush() error { valuesLen = 0 } } + br.setResultColumns(rcs) psp.ppBase.writeBlock(0, &br)