From d75b0df74758971536cd05ea50d9861b0b7ddc38 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 21 May 2024 00:06:58 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_topk.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 25a53d156..ba971555b 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -258,7 +258,8 @@ func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string, r.timestamp = timestamp rows := shard.rows - if len(rows) > 0 && !topkLess(shard.ps, r, rows[0]) { + maxRows := shard.ps.offset + shard.ps.limit + if uint64(len(rows)) >= maxRows && !topkLess(shard.ps, r, rows[0]) { // Fast path - nothing to add. return } @@ -282,7 +283,7 @@ func (shard *pipeTopkProcessorShard) addRow(br *blockResult, byColumns []string, shard.stateSizeBudget -= r.sizeBytes() // Push r to shard.rows. - if uint64(len(rows)) < shard.ps.offset+shard.ps.limit { + if uint64(len(rows)) < maxRows { heap.Push(shard, r) shard.stateSizeBudget -= int(unsafe.Sizeof(r)) } else {