From cb35e62e047f4878ffef17702076c80f7938f4c8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 01:49:20 +0200 Subject: [PATCH 1/6] lib/logstorage: work-in-progress Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258 --- docs/VictoriaLogs/CHANGELOG.md | 5 ++ docs/VictoriaLogs/LogsQL.md | 9 ++- lib/logstorage/block.go | 12 ++-- lib/logstorage/block_data.go | 14 ++--- lib/logstorage/block_result.go | 41 +++++++++---- lib/logstorage/block_search.go | 58 +++++++++++++++++-- lib/logstorage/filter_and.go | 64 ++++++++++++++++----- lib/logstorage/parser_test.go | 1 + lib/logstorage/pipe_sort.go | 5 +- lib/logstorage/stats_count_uniq.go | 4 +- lib/logstorage/stats_max.go | 4 +- lib/logstorage/stats_min.go | 4 +- lib/logstorage/stats_uniq_values.go | 23 -------- lib/logstorage/storage_search.go | 89 +++++++++-------------------- lib/logstorage/values_encoder.go | 8 +++ 15 files changed, 203 insertions(+), 138 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 1318af4a3..468bf09a7 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,11 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: use [natural sort order](https://en.wikipedia.org/wiki/Natural_sort_order) when sorting logs via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). + +* BUGFIX: properly return matching logs in [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with small number of entries. Previously they could be skipped. The issue has been introduced in [the release v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs). +* BUGFIX: fix `runtime error: index out of range` panic when using [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) like `_time:1h | sort by (_time)`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258). + ## [v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs) Released at 2024-05-12 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 05ee16bb9..40f6ce03d 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1175,7 +1175,10 @@ See also: ### sort pipe -By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used. +By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) can be used. +The returned logs are sorted by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) +using [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order). + For example, the following query returns logs for the last 5 minutes sorted by [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): @@ -1210,7 +1213,7 @@ See also: ### uniq pipe `| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query -returns uniq values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: ```logsql @@ -1536,7 +1539,7 @@ See also: `uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -The returned values are sorted and encoded in JSON array. +The returned values are encoded in JSON array. The order of the returned values is arbitrary. For example, the following query returns unique non-empty values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 3b6cd073f..0d1f9026a 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -149,8 +149,8 @@ func (c *column) resizeValues(valuesLen int) []string { // mustWriteTo writes c to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until c is changed. +func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -160,7 +160,7 @@ func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(c.name) + ch.name = c.name // encode values ve := getValuesEncoder() @@ -454,20 +454,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { // Marshal columns cs := b.columns - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { - cs[i].mustWriteTo(a, &chs[i], sw) + cs[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns) + csh.constColumns = append(csh.constColumns[:0], b.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 308e4d109..9f009c098 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -110,20 +110,18 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { // Marshal columns cds := bd.columnsData - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cds)) for i := range cds { - cds[i].mustWriteTo(a, &chs[i], sw) + cds[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns) + csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) @@ -310,8 +308,8 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) { // mustWriteTo writes cd to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until cd is changed. +func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -321,12 +319,12 @@ func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(cd.name) + ch.name = cd.name ch.valueType = cd.valueType ch.minValue = cd.minValue ch.maxValue = cd.maxValue - ch.valuesDict.copyFrom(a, &cd.valuesDict) + ch.valuesDict.copyFromNoArena(&cd.valuesDict) // marshal values ch.valuesSize = uint64(len(cd.valuesData)) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index de6de0721..e80a3516b 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "math" "slices" + "sync/atomic" "time" "unsafe" @@ -12,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // blockResult holds results for a single block of log entries. @@ -107,7 +109,7 @@ func (br *blockResult) cloneValues(values []string) []string { for _, v := range values { if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, v) + valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) } else { bufLen := len(buf) buf = append(buf, v...) @@ -259,7 +261,14 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { } // Initialize timestamps, since they are required for all the further work with br. + if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") { + // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. + rowsLen := bm.onesCount() + br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) + return + } + // Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps. srcTimestamps := bs.getTimestamps() if bm.areAllBitsSet() { // Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering. @@ -285,7 +294,7 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) appendValue := func(v string) { if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, v) + valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) } else { bufLen := len(buf) buf = append(buf, v...) @@ -1512,7 +1521,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { } } -func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMaxValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1620,7 +1629,7 @@ func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { } } -func (c *blockResultColumn) getMinValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMinValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1851,13 +1860,12 @@ func (rc *resultColumn) resetKeepName() { func (rc *resultColumn) addValue(v string) { values := rc.values if len(values) > 0 && string(v) == values[len(values)-1] { - rc.values = append(rc.values, values[len(values)-1]) - return + rc.values = append(values, values[len(values)-1]) + } else { + bufLen := len(rc.buf) + rc.buf = append(rc.buf, v...) + rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } - - bufLen := len(rc.buf) - rc.buf = append(rc.buf, v...) - rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } func truncateTimestampToMonth(timestamp int64) int64 { @@ -1870,5 +1878,18 @@ func truncateTimestampToYear(timestamp int64) int64 { return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() } +func getEmptyStrings(rowsCount int) []string { + p := emptyStrings.Load() + if p == nil { + values := make([]string, rowsCount) + emptyStrings.Store(&values) + return values + } + values := *p + return slicesutil.SetLength(values, rowsCount) +} + +var emptyStrings atomic.Pointer[[]string] + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 502e51941..6845f9c33 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -8,6 +8,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// The number of blocks to search at once by a single worker +// +// This number must be increased on systems with many CPU cores in order to amortize +// the overhead for passing the blockSearchWork to worker goroutines. +const blockSearchWorksPerBatch = 64 + type blockSearchWork struct { // p is the part where the block belongs to. p *part @@ -19,12 +25,54 @@ type blockSearchWork struct { bh blockHeader } -func newBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) *blockSearchWork { - var bsw blockSearchWork - bsw.p = p - bsw.so = so +func (bsw *blockSearchWork) reset() { + bsw.p = nil + bsw.so = nil + bsw.bh.reset() +} + +type blockSearchWorkBatch struct { + bsws []blockSearchWork +} + +func (bswb *blockSearchWorkBatch) reset() { + bsws := bswb.bsws + for i := range bsws { + bsws[i].reset() + } + bswb.bsws = bsws[:0] +} + +func getBlockSearchWorkBatch() *blockSearchWorkBatch { + v := blockSearchWorkBatchPool.Get() + if v == nil { + return &blockSearchWorkBatch{ + bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch), + } + } + return v.(*blockSearchWorkBatch) +} + +func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) { + bswb.reset() + blockSearchWorkBatchPool.Put(bswb) +} + +var blockSearchWorkBatchPool sync.Pool + +func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool { + bsws := bswb.bsws + + bsws = append(bsws, blockSearchWork{ + p: p, + so: so, + }) + bsw := &bsws[len(bsws)-1] bsw.bh.copyFrom(bh) - return &bsw + + bswb.bsws = bsws + + return len(bsws) < cap(bsws) } func getBlockSearch() *blockSearch { diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 7b4a46e02..fd5765004 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -3,6 +3,8 @@ package logstorage import ( "strings" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // filterAnd contains filters joined by AND opertor. @@ -30,19 +32,10 @@ func (fa *filterAnd) String() string { } func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { - if tokens := fa.getMsgTokens(); len(tokens) > 0 { - // Verify whether fa tokens for the _msg field match bloom filter. - ch := bs.csh.getColumnHeader("_msg") - if ch == nil { - // Fast path - there is no _msg field in the block. - bm.resetBits() - return - } - if !matchBloomFilterAllTokens(bs, ch, tokens) { - // Fast path - fa tokens for the _msg field do not match bloom filter. - bm.resetBits() - return - } + if !fa.matchMessageBloomFilter(bs) { + // Fast path - fa doesn't match _msg bloom filter. + bm.resetBits() + return } // Slow path - verify every filter separately. @@ -56,7 +49,29 @@ func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { } } -func (fa *filterAnd) getMsgTokens() []string { +func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool { + tokens := fa.getMessageTokens() + if len(tokens) == 0 { + return true + } + + v := bs.csh.getConstColumnValue("_msg") + if v != "" { + return matchStringByAllTokens(v, tokens) + } + + ch := bs.csh.getColumnHeader("_msg") + if ch == nil { + return false + } + + if ch.valueType == valueTypeDict { + return matchDictValuesByAllTokens(ch.valuesDict.values, tokens) + } + return matchBloomFilterAllTokens(bs, ch, tokens) +} + +func (fa *filterAnd) getMessageTokens() []string { fa.msgTokensOnce.Do(fa.initMsgTokens) return fa.msgTokens } @@ -89,3 +104,24 @@ func (fa *filterAnd) initMsgTokens() { } fa.msgTokens = a } + +func matchStringByAllTokens(v string, tokens []string) bool { + for _, token := range tokens { + if !matchPhrase(v, token) { + return false + } + } + return true +} + +func matchDictValuesByAllTokens(dictValues, tokens []string) bool { + bb := bbPool.Get() + for _, v := range dictValues { + bb.B = append(bb.B, v...) + bb.B = append(bb.B, ',') + } + v := bytesutil.ToUnsafeString(bb.B) + ok := matchStringByAllTokens(v, tokens) + bbPool.Put(bb) + return ok +} diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 97a06ee1d..ee9498a0b 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1395,6 +1395,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | sort by (f1)`, `*`, ``) f(`* | sort by (f1) | fields f2`, `f1,f2`, ``) + f(`_time:5m | sort by (_time) | fields foo`, `_time,foo`, ``) f(`* | sort by (f1) | fields *`, `*`, ``) f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`, ``) f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`, ``) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 1d406e081..783c638e8 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) // pipeSort processes '| sort ...' queries. @@ -639,9 +640,9 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort continue } if isDesc { - return sB < sA + return stringsutil.LessNatural(sB, sA) } - return sA < sB + return stringsutil.LessNatural(sA, sB) } return false } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 3999bab48..354e25813 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -175,6 +175,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } } + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -307,7 +308,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } - //sup.keyBuf = keyBuf + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -324,6 +325,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } + sup.keyBuf = keyBuf return stateSizeIncrease } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index bfbd959c2..62e215d5f 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -38,7 +38,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the maximum value across all the columns for _, c := range br.getColumns() { - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } @@ -47,7 +47,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { // Find the maximum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 5aa20a6d0..bf157cb20 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -38,7 +38,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } @@ -47,7 +47,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index df0e561ae..d11e61387 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -202,12 +202,10 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string { return "[]" } - // Sort unique items items := make([]string, 0, len(sup.m)) for k := range sup.m { items = append(items, k) } - slices.SortFunc(items, compareValues) if limit := sup.su.limit; limit > 0 && uint64(len(items)) > limit { items = items[:limit] @@ -242,27 +240,6 @@ func marshalJSONArray(items []string) string { return bytesutil.ToUnsafeString(b) } -func compareValues(a, b string) int { - fA, okA := tryParseFloat64(a) - fB, okB := tryParseFloat64(b) - if okA && okB { - if fA == fB { - return 0 - } - if fA < fB { - return -1 - } - return 1 - } - if okA { - return -1 - } - if okB { - return 1 - } - return strings.Compare(a, b) -} - func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) { fields, err := parseFieldNamesForStatsFunc(lex, "uniq_values") if err != nil { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 10e924697..2b1ea3ba4 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -6,10 +6,8 @@ import ( "slices" "sort" "sync" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // genericSearchOptions contain options used for search. @@ -164,25 +162,6 @@ func (c *BlockColumn) reset() { c.Values = nil } -func getEmptyStrings(rowsCount int) []string { - p := emptyStrings.Load() - if p == nil { - values := make([]string, rowsCount) - emptyStrings.Store(&values) - return values - } - values := *p - return slicesutil.SetLength(values, rowsCount) -} - -var emptyStrings atomic.Pointer[[]string] - -// The number of blocks to search at once by a single worker -// -// This number must be increased on systems with many CPU cores in order to amortize -// the overhead for passing the blockSearchWork to worker goroutines. -const blockSearchWorksPerBatch = 64 - // searchResultFunc must process sr. // // The callback is called at the worker with the given workerID. @@ -194,16 +173,19 @@ type searchResultFunc func(workerID uint, br *blockResult) func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers var wgWorkers sync.WaitGroup - workCh := make(chan []*blockSearchWork, workersCount) + workCh := make(chan *blockSearchWorkBatch, workersCount) wgWorkers.Add(workersCount) for i := 0; i < workersCount; i++ { go func(workerID uint) { bs := getBlockSearch() - for bsws := range workCh { - for _, bsw := range bsws { + for bswb := range workCh { + bsws := bswb.bsws + for i := range bsws { + bsw := &bsws[i] select { case <-stopCh: // The search has been canceled. Just skip all the scheduled work in order to save CPU time. + bsw.reset() continue default: } @@ -212,7 +194,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch if len(bs.br.timestamps) > 0 { processBlockResult(workerID, &bs.br) } + bsw.reset() } + bswb.bsws = bswb.bsws[:0] + putBlockSearchWorkBatch(bswb) } putBlockSearch(bs) wgWorkers.Done() @@ -280,7 +265,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { select { case <-stopCh: // Do not spend CPU time on search, since it is already stopped. @@ -367,7 +352,7 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) return result } -func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) @@ -393,7 +378,7 @@ func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, s } } -func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { bhss := getBlockHeaders() if len(so.tenantIDs) > 0 { p.searchByTenantIDs(so, bhss, workCh, stopCh) @@ -430,27 +415,20 @@ func (bhss *blockHeaders) reset() { bhss.bhs = bhs[:0] } -func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that tenantIDs are sorted tenantIDs := so.tenantIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -535,35 +513,26 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } } -func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that streamIDs are sorted streamIDs := so.streamIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -649,11 +618,9 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 9b7da6069..8d323aae3 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -70,6 +70,8 @@ func (ve *valuesEncoder) reset() { } // encode encodes values to ve.values and returns the encoded value type with min/max encoded values. +// +// ve.values and dict is valid until values are changed. func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) { ve.reset() @@ -1091,6 +1093,12 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) { vd.values = dstValues } +func (vd *valuesDict) copyFromNoArena(src *valuesDict) { + vd.reset() + + vd.values = append(vd.values[:0], src.values...) +} + func (vd *valuesDict) getOrAdd(k string) (byte, bool) { if len(k) > maxDictSizeBytes { return 0, false From da3af090c6b583c30e4377821d91a3237ee5c830 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 03:05:03 +0200 Subject: [PATCH 2/6] lib/logstorage: work-in-progress --- app/vlselect/logsql/buffered_writer.go | 47 ++++ app/vlselect/logsql/logsql.go | 28 +-- app/vlselect/logsql/sort_writer.go | 290 ------------------------ app/vlselect/logsql/sort_writer_test.go | 46 ---- docs/VictoriaLogs/LogsQL.md | 6 +- docs/VictoriaLogs/README.md | 3 - docs/VictoriaLogs/querying/README.md | 6 +- lib/logstorage/block_result.go | 3 +- lib/logstorage/parser.go | 9 + lib/logstorage/pipe_sort.go | 5 +- 10 files changed, 74 insertions(+), 369 deletions(-) create mode 100644 app/vlselect/logsql/buffered_writer.go delete mode 100644 app/vlselect/logsql/sort_writer.go delete mode 100644 app/vlselect/logsql/sort_writer_test.go diff --git a/app/vlselect/logsql/buffered_writer.go b/app/vlselect/logsql/buffered_writer.go new file mode 100644 index 000000000..57ad56199 --- /dev/null +++ b/app/vlselect/logsql/buffered_writer.go @@ -0,0 +1,47 @@ +package logsql + +import ( + "bufio" + "io" + "sync" +) + +func getBufferedWriter(w io.Writer) *bufferedWriter { + v := bufferedWriterPool.Get() + if v == nil { + return &bufferedWriter{ + bw: bufio.NewWriter(w), + } + } + bw := v.(*bufferedWriter) + bw.bw.Reset(w) + return bw +} + +func putBufferedWriter(bw *bufferedWriter) { + bw.reset() + bufferedWriterPool.Put(bw) +} + +var bufferedWriterPool sync.Pool + +type bufferedWriter struct { + mu sync.Mutex + bw *bufio.Writer +} + +func (bw *bufferedWriter) reset() { + // nothing to do +} + +func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) { + bw.mu.Lock() + _, _ = bw.bw.Write(p) + bw.mu.Unlock() +} + +func (bw *bufferedWriter) FlushIgnoreErrors() { + bw.mu.Lock() + _ = bw.bw.Flush() + bw.mu.Unlock() +} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 0b861ccda..6cf2b266f 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -6,18 +6,11 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -var ( - maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+ - "if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+ - "too big value for this flag may result in high memory usage since the sorting is performed in memory") -) - // ProcessQueryRequest handles /select/logsql/query request. func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { // Extract tenantID @@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") - sw := getSortWriter() - sw.Init(w, maxSortBufferSize.IntN(), limit) + if limit > 0 { + q.AddPipeLimit(uint64(limit)) + } + tenantIDs := []logstorage.TenantID{tenantID} - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() + bw := getBufferedWriter(w) writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 { @@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req for i := range timestamps { WriteJSONRow(bb, columns, i) } - - if !sw.TryWrite(bb.B) { - cancel() - } - + bw.WriteIgnoreErrors(bb.B) blockResultPool.Put(bb) } - err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock) + err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) - sw.FinalFlush() - putSortWriter(sw) + bw.FlushIgnoreErrors() + putBufferedWriter(bw) if err != nil { httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err) diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go deleted file mode 100644 index 3ad0fbf70..000000000 --- a/app/vlselect/logsql/sort_writer.go +++ /dev/null @@ -1,290 +0,0 @@ -package logsql - -import ( - "bytes" - "io" - "sort" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" -) - -func getSortWriter() *sortWriter { - v := sortWriterPool.Get() - if v == nil { - return &sortWriter{} - } - return v.(*sortWriter) -} - -func putSortWriter(sw *sortWriter) { - sw.reset() - sortWriterPool.Put(sw) -} - -var sortWriterPool sync.Pool - -// sortWriter expects JSON line stream to be written to it. -// -// It buffers the incoming data until its size reaches maxBufLen. -// Then it streams the buffered data and all the incoming data to w. -// -// The FinalFlush() must be called when all the data is written. -// If the buf isn't empty at FinalFlush() call, then the buffered data -// is sorted by _time field. -type sortWriter struct { - mu sync.Mutex - w io.Writer - - maxLines int - linesWritten int - - maxBufLen int - buf []byte - bufFlushed bool - - hasErr bool -} - -func (sw *sortWriter) reset() { - sw.w = nil - - sw.maxLines = 0 - sw.linesWritten = 0 - - sw.maxBufLen = 0 - sw.buf = sw.buf[:0] - sw.bufFlushed = false - sw.hasErr = false -} - -// Init initializes sw. -// -// If maxLines is set to positive value, then sw accepts up to maxLines -// and then rejects all the other lines by returning false from TryWrite. -func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) { - sw.reset() - - sw.w = w - sw.maxBufLen = maxBufLen - sw.maxLines = maxLines -} - -// TryWrite writes p to sw. -// -// True is returned on successful write, false otherwise. -// -// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw. -func (sw *sortWriter) TryWrite(p []byte) bool { - sw.mu.Lock() - defer sw.mu.Unlock() - - if sw.hasErr { - return false - } - - if sw.bufFlushed { - if !sw.writeToUnderlyingWriterLocked(p) { - sw.hasErr = true - return false - } - return true - } - - if len(sw.buf)+len(p) < sw.maxBufLen { - sw.buf = append(sw.buf, p...) - return true - } - - sw.bufFlushed = true - if !sw.writeToUnderlyingWriterLocked(sw.buf) { - sw.hasErr = true - return false - } - sw.buf = sw.buf[:0] - - if !sw.writeToUnderlyingWriterLocked(p) { - sw.hasErr = true - return false - } - return true -} - -func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool { - if len(p) == 0 { - return true - } - if sw.maxLines > 0 { - if sw.linesWritten >= sw.maxLines { - return false - } - var linesLeft int - p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten) - sw.linesWritten += linesLeft - } - if _, err := sw.w.Write(p); err != nil { - return false - } - return true -} - -func trimLines(p []byte, maxLines int) ([]byte, int) { - if maxLines <= 0 { - return nil, 0 - } - n := bytes.Count(p, newline) - if n < maxLines { - return p, n - } - for n >= maxLines { - idx := bytes.LastIndexByte(p, '\n') - p = p[:idx] - n-- - } - return p[:len(p)+1], maxLines -} - -var newline = []byte("\n") - -func (sw *sortWriter) FinalFlush() { - if sw.hasErr || sw.bufFlushed { - return - } - - rs := getRowsSorter() - rs.parseRows(sw.buf) - rs.sort() - - rows := rs.rows - if sw.maxLines > 0 && len(rows) > sw.maxLines { - rows = rows[:sw.maxLines] - } - WriteJSONRows(sw.w, rows) - - putRowsSorter(rs) -} - -func getRowsSorter() *rowsSorter { - v := rowsSorterPool.Get() - if v == nil { - return &rowsSorter{} - } - return v.(*rowsSorter) -} - -func putRowsSorter(rs *rowsSorter) { - rs.reset() - rowsSorterPool.Put(rs) -} - -var rowsSorterPool sync.Pool - -type rowsSorter struct { - buf []byte - fieldsBuf []logstorage.Field - rows [][]logstorage.Field - times []string -} - -func (rs *rowsSorter) reset() { - rs.buf = rs.buf[:0] - - fieldsBuf := rs.fieldsBuf - for i := range fieldsBuf { - fieldsBuf[i].Reset() - } - rs.fieldsBuf = fieldsBuf[:0] - - rows := rs.rows - for i := range rows { - rows[i] = nil - } - rs.rows = rows[:0] - - times := rs.times - for i := range times { - times[i] = "" - } - rs.times = times[:0] -} - -func (rs *rowsSorter) parseRows(src []byte) { - rs.reset() - - buf := rs.buf - fieldsBuf := rs.fieldsBuf - rows := rs.rows - times := rs.times - - p := logjson.GetParser() - for len(src) > 0 { - var line []byte - n := bytes.IndexByte(src, '\n') - if n < 0 { - line = src - src = nil - } else { - line = src[:n] - src = src[n+1:] - } - if len(line) == 0 { - continue - } - - if err := p.ParseLogMessage(line); err != nil { - logger.Panicf("BUG: unexpected invalid JSON line: %s", err) - } - - timeValue := "" - fieldsBufLen := len(fieldsBuf) - for _, f := range p.Fields { - bufLen := len(buf) - buf = append(buf, f.Name...) - name := bytesutil.ToUnsafeString(buf[bufLen:]) - - bufLen = len(buf) - buf = append(buf, f.Value...) - value := bytesutil.ToUnsafeString(buf[bufLen:]) - - fieldsBuf = append(fieldsBuf, logstorage.Field{ - Name: name, - Value: value, - }) - - if name == "_time" { - timeValue = value - } - } - rows = append(rows, fieldsBuf[fieldsBufLen:]) - times = append(times, timeValue) - } - logjson.PutParser(p) - - rs.buf = buf - rs.fieldsBuf = fieldsBuf - rs.rows = rows - rs.times = times -} - -func (rs *rowsSorter) Len() int { - return len(rs.rows) -} - -func (rs *rowsSorter) Less(i, j int) bool { - times := rs.times - return times[i] < times[j] -} - -func (rs *rowsSorter) Swap(i, j int) { - times := rs.times - rows := rs.rows - times[i], times[j] = times[j], times[i] - rows[i], rows[j] = rows[j], rows[i] -} - -func (rs *rowsSorter) sort() { - sort.Sort(rs) -} diff --git a/app/vlselect/logsql/sort_writer_test.go b/app/vlselect/logsql/sort_writer_test.go deleted file mode 100644 index 3c3325726..000000000 --- a/app/vlselect/logsql/sort_writer_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package logsql - -import ( - "bytes" - "strings" - "testing" -) - -func TestSortWriter(t *testing.T) { - f := func(maxBufLen, maxLines int, data string, expectedResult string) { - t.Helper() - - var bb bytes.Buffer - sw := getSortWriter() - sw.Init(&bb, maxBufLen, maxLines) - for _, s := range strings.Split(data, "\n") { - if !sw.TryWrite([]byte(s + "\n")) { - break - } - } - sw.FinalFlush() - putSortWriter(sw) - - result := bb.String() - if result != expectedResult { - t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult) - } - } - - f(100, 0, "", "") - f(100, 0, "{}", "{}\n") - - data := `{"_time":"def","_msg":"xxx"} -{"_time":"abc","_msg":"foo"}` - resultExpected := `{"_time":"abc","_msg":"foo"} -{"_time":"def","_msg":"xxx"} -` - f(100, 0, data, resultExpected) - f(10, 0, data, data+"\n") - - // Test with the maxLines - f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n") - f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n") - f(10, 2, data, data+"\n") - f(100, 2, data, resultExpected) -} diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 40f6ce03d..44291699c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1641,11 +1641,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying ## Sorting -By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) -if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to 1MB). -Otherwise sorting is skipped because of performance reasons. - -Use [`sort` pipe](#sort-pipe) for sorting the results. +By default VictoriaLogs doesn't sort the returned results because of performance reasons. Use [`sort` pipe](#sort-pipe) for sorting the results. ## Limiters diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index 0f92053f1..3406d5525 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -252,9 +252,6 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line The maximum duration for query execution (default 30s) -search.maxQueueDuration duration The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s) - -select.maxSortBufferSize size - Query results from /select/logsql/query are automatically sorted by _time if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; too big value for this flag may result in high memory usage since the sorting is performed in memory - Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576) -storageDataPath string Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data") -storage.minFreeDiskSpaceBytes size diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 77ef0004f..2755dac0a 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -66,10 +66,8 @@ The response can be interrupted at any time by closing the connection to Victori This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc. See [these docs](#command-line) for more details. -The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) -if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte). -Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found. -Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting) +The returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found. +Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sort-pipe) or at client side with the usual `sort` command according to [these docs](#command-line). By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy) is queried. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index e80a3516b..a873a1c0f 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -261,7 +261,8 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { } // Initialize timestamps, since they are required for all the further work with br. - if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") { + so := bs.bsw.so + if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") { // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. rowsLen := bm.onesCount() br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 67faad890..b93a9eaed 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,6 +206,15 @@ func (q *Query) String() string { return s } +// AddPipeLimit adds `| limit n` pipe to q. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe +func (q *Query) AddPipeLimit(n uint64) { + q.pipes = append(q.pipes, &pipeLimit{ + n: n, + }) +} + func (q *Query) getNeededColumns() ([]string, []string) { neededFields := newFieldsSet() neededFields.add("*") diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 783c638e8..aee2028cf 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -583,7 +583,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if ccA == ccB { continue } - return cA.c.encodedValues[0] < cB.c.encodedValues[0] + if isDesc { + return ccB < ccA + } + return ccA < ccB } if cA.c.isTime && cB.c.isTime { From c90e6de13b36b081ba48af4b4c062423203a9508 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 03:06:53 +0200 Subject: [PATCH 3/6] docs/VictoriaLogs/CHANGELOG.md: cut v0.6.1-victorialogs --- docs/VictoriaLogs/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 468bf09a7..7f5788449 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,11 +19,16 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +## [v0.6.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.1-victorialogs) + +Released at 2024-05-14 + * FEATURE: use [natural sort order](https://en.wikipedia.org/wiki/Natural_sort_order) when sorting logs via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). * BUGFIX: properly return matching logs in [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with small number of entries. Previously they could be skipped. The issue has been introduced in [the release v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs). * BUGFIX: fix `runtime error: index out of range` panic when using [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) like `_time:1h | sort by (_time)`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258). + ## [v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs) Released at 2024-05-12 From 6a6e34ab8ec752ae9dfd03e0101625ffb242b9b5 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Tue, 14 May 2024 09:26:50 +0200 Subject: [PATCH 4/6] app/vmauth: explicitly unregister metrics set for auth config (#6252) it's needed to remove Summary metric type from the global state of metrics package. metrics package tracks each bucket of summary and periodically swaps old buckets with new. Simple set unregister is not enough to release memory used by Set https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6247 --- app/vmauth/auth_config.go | 8 ++++++++ docs/CHANGELOG.md | 1 + 2 files changed, 9 insertions(+) diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go index e2e38edf0..b4afeee31 100644 --- a/app/vmauth/auth_config.go +++ b/app/vmauth/auth_config.go @@ -693,6 +693,14 @@ func loadAuthConfig() (bool, error) { authConfig.Store(ac) authConfigData.Store(&data) authUsers.Store(&m) + if prevAc != nil { + // explicilty unregister metrics, since all summary type metrics + // are registered at global state of metrics package + // and must be removed from it to release memory. + // Metrics must be unregistered only after atomic.Value.Store calls above + // Otherwise it may lead to metric gaps, since UnregisterAllMetrics is slow operation + prevAc.ms.UnregisterAllMetrics() + } return true, nil } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d14ab38ec..eea50f1b0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -50,6 +50,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/stream-aggregation/): set correct suffix `_prometheus` for aggregation outputs [increase_prometheus](https://docs.victoriametrics.com/stream-aggregation/#increase_prometheus) and [total_prometheus](https://docs.victoriametrics.com/stream-aggregation/#total_prometheus). Before, outputs `total` and `total_prometheus` or `increase` and `increase_prometheus` had the same suffix. * BUGFIX: properly estimate the needed memory for query execution if it has the format [`aggr_func`](https://docs.victoriametrics.com/metricsql/#aggregate-functions)([`rollup_func[d]`](https://docs.victoriametrics.com/metricsql/#rollup-functions) (for example, `sum(rate(request_duration_seconds_bucket[5m]))`). This should allow performing aggregations over bigger number of time series when VictoriaMetrics runs in environments with small amounts of available memory. The issue has been introduced in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/5138eaeea0791caa34bcfab410e0ca9cd253cd8f) in [v1.83.0](https://docs.victoriametrics.com/changelog_2022/#v1830). * BUGFIX: [Single-node VictoriaMetrics](https://docs.victoriametrics.com/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): correctly apply `-inmemoryDataFlushInterval` when it's set to minimum supported value 1s. +* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly release memory used for metrics during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6247). * DEPRECATION: [vmagent](https://docs.victoriametrics.com/vmagent/): removed deprecated `-remoteWrite.multitenantURL` flag from vmagent. This flag was deprecated since [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0). Use `-enableMultitenantHandlers` instead, as it is easier to use and combine with [multitenant URL at vminsert](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels). See these [docs for details](https://docs.victoriametrics.com/vmagent.html#multitenancy). From b0c1f3d819401841cebc06f560d09607cf5ece93 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Tue, 14 May 2024 14:43:39 +0200 Subject: [PATCH 5/6] app/vmalert/rule: reduce number of allocations for getStaleSeries fn (#6269) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Allocations are reduced by re-using the byte buffer when converting labels to string keys. ``` name old allocs/op new allocs/op delta GetStaleSeries-10 703 ± 0% 203 ± 0% ~ (p=1.000 n=1+1) ``` Signed-off-by: hagen1778 --- app/vmalert/rule/group.go | 34 ++++++++++++++----------- app/vmalert/rule/group_timing_test.go | 36 +++++++++++++++++++++++++++ app/vmalert/rule/test_helpers.go | 2 +- 3 files changed, 57 insertions(+), 15 deletions(-) create mode 100644 app/vmalert/rule/group_timing_test.go diff --git a/app/vmalert/rule/group.go b/app/vmalert/rule/group.go index f4e6cd0dd..520039d71 100644 --- a/app/vmalert/rule/group.go +++ b/app/vmalert/rule/group.go @@ -9,10 +9,11 @@ import ( "hash/fnv" "net/url" "strconv" - "strings" "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/cheggaaa/pb/v3" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmalert/config" @@ -724,13 +725,19 @@ func (e *executor) exec(ctx context.Context, r Rule, ts time.Time, resolveDurati return errGr.Err() } +var bbPool bytesutil.ByteBufferPool + // getStaleSeries checks whether there are stale series from previously sent ones. func (e *executor) getStaleSeries(r Rule, tss []prompbmarshal.TimeSeries, timestamp time.Time) []prompbmarshal.TimeSeries { + bb := bbPool.Get() + defer bbPool.Put(bb) + ruleLabels := make(map[string][]prompbmarshal.Label, len(tss)) for _, ts := range tss { - // convert labels to strings so we can compare with previously sent series - key := labelsToString(ts.Labels) - ruleLabels[key] = ts.Labels + // convert labels to strings, so we can compare with previously sent series + bb.B = labelsToString(bb.B, ts.Labels) + ruleLabels[string(bb.B)] = ts.Labels + bb.Reset() } rID := r.ID() @@ -776,21 +783,20 @@ func (e *executor) purgeStaleSeries(activeRules []Rule) { e.previouslySentSeriesToRWMu.Unlock() } -func labelsToString(labels []prompbmarshal.Label) string { - var b strings.Builder - b.WriteRune('{') +func labelsToString(dst []byte, labels []prompbmarshal.Label) []byte { + dst = append(dst, '{') for i, label := range labels { if len(label.Name) == 0 { - b.WriteString("__name__") + dst = append(dst, "__name__"...) } else { - b.WriteString(label.Name) + dst = append(dst, label.Name...) } - b.WriteRune('=') - b.WriteString(strconv.Quote(label.Value)) + dst = append(dst, '=') + dst = strconv.AppendQuote(dst, label.Value) if i < len(labels)-1 { - b.WriteRune(',') + dst = append(dst, ',') } } - b.WriteRune('}') - return b.String() + dst = append(dst, '}') + return dst } diff --git a/app/vmalert/rule/group_timing_test.go b/app/vmalert/rule/group_timing_test.go new file mode 100644 index 000000000..c233ed34d --- /dev/null +++ b/app/vmalert/rule/group_timing_test.go @@ -0,0 +1,36 @@ +package rule + +import ( + "fmt" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" +) + +func BenchmarkGetStaleSeries(b *testing.B) { + ts := time.Now() + n := 100 + payload := make([]prompbmarshal.TimeSeries, n) + for i := 0; i < n; i++ { + s := fmt.Sprintf("%d", i) + labels := toPromLabels(b, + "__name__", "foo", ""+ + "instance", s, + "job", s, + "state", s, + ) + payload = append(payload, newTimeSeriesPB([]float64{1}, []int64{ts.Unix()}, labels)) + } + + e := &executor{ + previouslySentSeriesToRW: make(map[uint64]map[string][]prompbmarshal.Label), + } + ar := &AlertingRule{RuleID: 1} + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + e.getStaleSeries(ar, payload, ts) + } +} diff --git a/app/vmalert/rule/test_helpers.go b/app/vmalert/rule/test_helpers.go index dcf23dc76..9373e2b81 100644 --- a/app/vmalert/rule/test_helpers.go +++ b/app/vmalert/rule/test_helpers.go @@ -95,7 +95,7 @@ func metricWithLabels(t *testing.T, labels ...string) datasource.Metric { return m } -func toPromLabels(t *testing.T, labels ...string) []prompbmarshal.Label { +func toPromLabels(t testing.TB, labels ...string) []prompbmarshal.Label { t.Helper() if len(labels) == 0 || len(labels)%2 != 0 { t.Fatalf("expected to get even number of labels") From b617dc9c0b32f2833b57e38c1d9c3349fbc891f7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 17:47:19 +0200 Subject: [PATCH 6/6] lib/streamaggr: properly return output key from getOutputKey The bug has been introduced in cc2647d212b4124466e23da69a7e19a76f1f67d5 --- lib/streamaggr/streamaggr.go | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index f7110290e..f93c78515 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -871,6 +871,7 @@ func getOutputKey(key string) string { if nSize <= 0 { logger.Panicf("BUG: cannot unmarshal inputKeyLen from uvarint") } + src = src[nSize:] outputKey := src[inputKeyLen:] return bytesutil.ToUnsafeString(outputKey) }