This commit is contained in:
Aliaksandr Valialkin 2024-06-05 01:50:59 +02:00
parent c96f4c565d
commit 87174246e5
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 14 additions and 4 deletions

View file

@ -25,6 +25,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
* FEATURE: add ability to format numeric fields into string representation of time, duration and IPv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe). * FEATURE: add ability to format numeric fields into string representation of time, duration and IPv4 with [`format` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#format-pipe).
* FEATURE: set `format` field to `rfc3164` or `rfc5424` depending on the [Syslog format](https://en.wikipedia.org/wiki/Syslog) parsed via [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe). * FEATURE: set `format` field to `rfc3164` or `rfc5424` depending on the [Syslog format](https://en.wikipedia.org/wiki/Syslog) parsed via [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe).
* BUGFIX: always respect the limit set in [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe). Previously the limit could be exceeded in some cases.
## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) ## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs)
Released at 2024-06-04 Released at 2024-06-04

View file

@ -44,7 +44,11 @@ func (fr *filterRange) applyToBlockResult(br *blockResult, bm *bitmap) {
return return
} }
if c.isTime { if c.isTime {
bm.resetBits() minValueInt, maxValueInt := toInt64Range(minValue, maxValue)
bm.forEachSetBit(func(idx int) bool {
timestamp := br.timestamps[idx]
return timestamp >= minValueInt && timestamp <= maxValueInt
})
return return
} }

View file

@ -58,21 +58,25 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
} }
rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= plp.pl.limit { limit := plp.pl.limit
if rowsProcessed <= limit {
// Fast path - write all the rows to ppNext. // Fast path - write all the rows to ppNext.
plp.ppNext.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)
if rowsProcessed == limit {
plp.cancel()
}
return return
} }
// Slow path - overflow. Write the remaining rows if needed. // Slow path - overflow. Write the remaining rows if needed.
rowsProcessed -= uint64(len(br.timestamps)) rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= plp.pl.limit { if rowsProcessed >= limit {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return return
} }
// Write remaining rows. // Write remaining rows.
keepRows := plp.pl.limit - rowsProcessed keepRows := limit - rowsProcessed
br.truncateRows(int(keepRows)) br.truncateRows(int(keepRows))
plp.ppNext.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)