From 87174246e5861360a29f7cd81b435e9b8d65fc78 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 5 Jun 2024 01:50:59 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 2 ++ lib/logstorage/filter_range.go | 6 +++++- lib/logstorage/pipe_limit.go | 10 +++++++--- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index b7e14bfda..e1a7bba5f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -25,6 +25,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta * FEATURE: add ability to format numeric fields into string representation of time, duration and IPv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe). * FEATURE: set `format` field to `rfc3164` or `rfc5424` depending on the [Syslog format](https://en.wikipedia.org/wiki/Syslog) parsed via [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe). +* BUGFIX: always respect the limit set in [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe). Previously the limit could be exceeded in some cases. + ## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) Released at 2024-06-04 diff --git a/lib/logstorage/filter_range.go b/lib/logstorage/filter_range.go index e5c96d399..3ad8233fb 100644 --- a/lib/logstorage/filter_range.go +++ b/lib/logstorage/filter_range.go @@ -44,7 +44,11 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) { return } if c.isTime { - bm.resetBits() + minValueInt, maxValueInt := toInt64Range(minValue, maxValue) + bm.forEachSetBit(func(idx int) bool { + timestamp := br.timestamps[idx] + return timestamp >= minValueInt && timestamp <= maxValueInt + }) return } diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 41f98fded..c45df3325 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -58,21 +58,25 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { } rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) - if rowsProcessed <= plp.pl.limit { + limit := plp.pl.limit + if rowsProcessed <= limit { // Fast path - write all the rows to ppNext. plp.ppNext.writeBlock(workerID, br) + if rowsProcessed == limit { + plp.cancel() + } return } // Slow path - overflow. Write the remaining rows if needed. rowsProcessed -= uint64(len(br.timestamps)) - if rowsProcessed >= plp.pl.limit { + if rowsProcessed >= limit { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - keepRows := plp.pl.limit - rowsProcessed + keepRows := limit - rowsProcessed br.truncateRows(int(keepRows)) plp.ppNext.writeBlock(workerID, br)