diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index fa9fbe236..f706718ef 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. * FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 6c0c1cac7..f2921ea10 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -69,9 +69,9 @@ func (br *blockResult) clone() *blockResult { cs := br.getColumns() - // Pre-populate valuesEncoded in every column in order to properly calculate the needed backing buffer size below. + // Pre-populate values in every column in order to properly calculate the needed backing buffer size below. for _, c := range cs { - _ = c.getValuesEncoded(br) + _ = c.getValues(br) } // Calculate the backing buffer size needed for cloning column values. diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index f92ea86a1..de4e70339 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -75,11 +75,11 @@ type pipeTopkProcessorShardNopad struct { tmpRow pipeTopkRow // these are aux fields for determining whether the next row must be stored in rows. - byColumnValues [][]string - otherColumnValues []pipeTopkOtherColumn - byColumns []string - byColumnsIsTime []bool - otherColumns []Field + byColumnValues [][]string + csOther []*blockResultColumn + byColumns []string + byColumnsIsTime []bool + otherColumns []Field // stateSizeBudget is the remaining budget for the whole state size for the shard. // The per-shard budget is provided in chunks from the parent pipeTopkProcessor. @@ -186,7 +186,6 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumns := shard.byColumns[:0] byColumnsIsTime := shard.byColumnsIsTime[:0] - otherColumns := shard.otherColumns[:0] bb := bbPool.Get() for rowIdx, timestamp := range br.timestamps { byColumns = byColumns[:0] @@ -199,20 +198,11 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B)) byColumnsIsTime = append(byColumnsIsTime, false) - otherColumns = otherColumns[:0] - for i, values := range byColumnValues { - otherColumns = append(otherColumns, Field{ - Name: cs[i].name, - Value: values[rowIdx], - }) - } - - shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp) + shard.addRow(br, byColumns, byColumnsIsTime, cs, rowIdx, timestamp) } bbPool.Put(bb) shard.byColumns = byColumns shard.byColumnsIsTime = byColumnsIsTime - shard.otherColumns = otherColumns } else { // Sort by byFields @@ -232,7 +222,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { shard.byColumnValues = byColumnValues shard.byColumnsIsTime = byColumnsIsTime - otherColumnValues := shard.otherColumnValues[:0] + csOther := shard.csOther[:0] for _, c := range cs { isByField := false for _, bf := range byFields { @@ -242,17 +232,13 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { } } if !isByField { - otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{ - name: c.name, - values: c.getValues(br), - }) + csOther = append(csOther, c) } } - shard.otherColumnValues = otherColumnValues + shard.csOther = csOther // add rows to shard byColumns := shard.byColumns[:0] - otherColumns := shard.otherColumns[:0] for rowIdx, timestamp := range br.timestamps { byColumns = byColumns[:0] @@ -264,26 +250,16 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) { byColumns = append(byColumns, v) } - otherColumns = otherColumns[:0] - for _, ocv := range otherColumnValues { - otherColumns = append(otherColumns, Field{ - Name: ocv.name, - Value: ocv.values[rowIdx], - }) - } - - shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp) + shard.addRow(br, byColumns, byColumnsIsTime, csOther, rowIdx, timestamp) } shard.byColumns = byColumns - shard.otherColumns = otherColumns } } -func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime []bool, otherColumns []Field, timestamp int64) { +func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string, byColumnsIsTime []bool, csOther []*blockResultColumn, rowIdx int, timestamp int64) { r := &shard.tmpRow r.byColumns = byColumns r.byColumnsIsTime = byColumnsIsTime - r.otherColumns = otherColumns r.timestamp = timestamp rows := shard.rows @@ -293,8 +269,24 @@ func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime } // Slow path - add r to shard.rows. + + // Populate r.otherColumns + otherColumns := shard.otherColumns[:0] + for _, c := range csOther { + v := c.getValueAtRow(br, rowIdx) + otherColumns = append(otherColumns, Field{ + Name: c.name, + Value: v, + }) + } + shard.otherColumns = otherColumns + r.otherColumns = otherColumns + + // Clone r, so it doesn't refer the original data. r = r.clone() shard.stateSizeBudget -= r.sizeBytes() + + // Push r to shard.rows. if uint64(len(rows)) < shard.ps.offset+shard.ps.limit { heap.Push(shard, r) shard.stateSizeBudget -= int(unsafe.Sizeof(r))