This commit is contained in:
Aliaksandr Valialkin 2024-05-17 16:08:44 +02:00
parent c5ced867dc
commit 0d51cad96f
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 30 additions and 37 deletions

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip
* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases.
* FEATURE: allow applying individual [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) per each [stats function](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe-functions). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters).
* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them.

View file

@ -69,9 +69,9 @@ func (br *blockResult) clone() *blockResult {
cs := br.getColumns()
// Pre-populate valuesEncoded in every column in order to properly calculate the needed backing buffer size below.
// Pre-populate values in every column in order to properly calculate the needed backing buffer size below.
for _, c := range cs {
_ = c.getValuesEncoded(br)
_ = c.getValues(br)
}
// Calculate the backing buffer size needed for cloning column values.

View file

@ -75,11 +75,11 @@ type pipeTopkProcessorShardNopad struct {
tmpRow pipeTopkRow
// these are aux fields for determining whether the next row must be stored in rows.
byColumnValues [][]string
otherColumnValues []pipeTopkOtherColumn
byColumns []string
byColumnsIsTime []bool
otherColumns []Field
byColumnValues [][]string
csOther []*blockResultColumn
byColumns []string
byColumnsIsTime []bool
otherColumns []Field
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeTopkProcessor.
@ -186,7 +186,6 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
byColumns := shard.byColumns[:0]
byColumnsIsTime := shard.byColumnsIsTime[:0]
otherColumns := shard.otherColumns[:0]
bb := bbPool.Get()
for rowIdx, timestamp := range br.timestamps {
byColumns = byColumns[:0]
@ -199,20 +198,11 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
byColumns = append(byColumns, bytesutil.ToUnsafeString(bb.B))
byColumnsIsTime = append(byColumnsIsTime, false)
otherColumns = otherColumns[:0]
for i, values := range byColumnValues {
otherColumns = append(otherColumns, Field{
Name: cs[i].name,
Value: values[rowIdx],
})
}
shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp)
shard.addRow(br, byColumns, byColumnsIsTime, cs, rowIdx, timestamp)
}
bbPool.Put(bb)
shard.byColumns = byColumns
shard.byColumnsIsTime = byColumnsIsTime
shard.otherColumns = otherColumns
} else {
// Sort by byFields
@ -232,7 +222,7 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
shard.byColumnValues = byColumnValues
shard.byColumnsIsTime = byColumnsIsTime
otherColumnValues := shard.otherColumnValues[:0]
csOther := shard.csOther[:0]
for _, c := range cs {
isByField := false
for _, bf := range byFields {
@ -242,17 +232,13 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
}
}
if !isByField {
otherColumnValues = append(otherColumnValues, pipeTopkOtherColumn{
name: c.name,
values: c.getValues(br),
})
csOther = append(csOther, c)
}
}
shard.otherColumnValues = otherColumnValues
shard.csOther = csOther
// add rows to shard
byColumns := shard.byColumns[:0]
otherColumns := shard.otherColumns[:0]
for rowIdx, timestamp := range br.timestamps {
byColumns = byColumns[:0]
@ -264,26 +250,16 @@ func (shard *pipeTopkProcessorShard) writeBlock(br *blockResult) {
byColumns = append(byColumns, v)
}
otherColumns = otherColumns[:0]
for _, ocv := range otherColumnValues {
otherColumns = append(otherColumns, Field{
Name: ocv.name,
Value: ocv.values[rowIdx],
})
}
shard.addRow(byColumns, byColumnsIsTime, otherColumns, timestamp)
shard.addRow(br, byColumns, byColumnsIsTime, csOther, rowIdx, timestamp)
}
shard.byColumns = byColumns
shard.otherColumns = otherColumns
}
}
func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime []bool, otherColumns []Field, timestamp int64) {
func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string, byColumnsIsTime []bool, csOther []*blockResultColumn, rowIdx int, timestamp int64) {
r := &shard.tmpRow
r.byColumns = byColumns
r.byColumnsIsTime = byColumnsIsTime
r.otherColumns = otherColumns
r.timestamp = timestamp
rows := shard.rows
@ -293,8 +269,24 @@ func (shard *pipeTopkProcessorShard) addRow(byColumns []string, byColumnsIsTime
}
// Slow path - add r to shard.rows.
// Populate r.otherColumns
otherColumns := shard.otherColumns[:0]
for _, c := range csOther {
v := c.getValueAtRow(br, rowIdx)
otherColumns = append(otherColumns, Field{
Name: c.name,
Value: v,
})
}
shard.otherColumns = otherColumns
r.otherColumns = otherColumns
// Clone r, so it doesn't refer the original data.
r = r.clone()
shard.stateSizeBudget -= r.sizeBytes()
// Push r to shard.rows.
if uint64(len(rows)) < shard.ps.offset+shard.ps.limit {
heap.Push(shard, r)
shard.stateSizeBudget -= int(unsafe.Sizeof(r))