From cb35e62e047f4878ffef17702076c80f7938f4c8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 01:49:20 +0200 Subject: [PATCH] lib/logstorage: work-in-progress Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258 --- docs/VictoriaLogs/CHANGELOG.md | 5 ++ docs/VictoriaLogs/LogsQL.md | 9 ++- lib/logstorage/block.go | 12 ++-- lib/logstorage/block_data.go | 14 ++--- lib/logstorage/block_result.go | 41 +++++++++---- lib/logstorage/block_search.go | 58 +++++++++++++++++-- lib/logstorage/filter_and.go | 64 ++++++++++++++++----- lib/logstorage/parser_test.go | 1 + lib/logstorage/pipe_sort.go | 5 +- lib/logstorage/stats_count_uniq.go | 4 +- lib/logstorage/stats_max.go | 4 +- lib/logstorage/stats_min.go | 4 +- lib/logstorage/stats_uniq_values.go | 23 -------- lib/logstorage/storage_search.go | 89 +++++++++-------------------- lib/logstorage/values_encoder.go | 8 +++ 15 files changed, 203 insertions(+), 138 deletions(-) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 1318af4a3..468bf09a7 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,11 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: use [natural sort order](https://en.wikipedia.org/wiki/Natural_sort_order) when sorting logs via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). + +* BUGFIX: properly return matching logs in [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with small number of entries. Previously they could be skipped. The issue has been introduced in [the release v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs). +* BUGFIX: fix `runtime error: index out of range` panic when using [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) like `_time:1h | sort by (_time)`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258). + ## [v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs) Released at 2024-05-12 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 05ee16bb9..40f6ce03d 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1175,7 +1175,10 @@ See also: ### sort pipe -By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used. +By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) can be used. +The returned logs are sorted by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) +using [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order). + For example, the following query returns logs for the last 5 minutes sorted by [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): @@ -1210,7 +1213,7 @@ See also: ### uniq pipe `| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query -returns uniq values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: ```logsql @@ -1536,7 +1539,7 @@ See also: `uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -The returned values are sorted and encoded in JSON array. +The returned values are encoded in JSON array. The order of the returned values is arbitrary. For example, the following query returns unique non-empty values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 3b6cd073f..0d1f9026a 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -149,8 +149,8 @@ func (c *column) resizeValues(valuesLen int) []string { // mustWriteTo writes c to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until c is changed. +func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -160,7 +160,7 @@ func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(c.name) + ch.name = c.name // encode values ve := getValuesEncoder() @@ -454,20 +454,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) { // Marshal columns cs := b.columns - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cs)) for i := range cs { - cs[i].mustWriteTo(a, &chs[i], sw) + cs[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns) + csh.constColumns = append(csh.constColumns[:0], b.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) diff --git a/lib/logstorage/block_data.go b/lib/logstorage/block_data.go index 308e4d109..9f009c098 100644 --- a/lib/logstorage/block_data.go +++ b/lib/logstorage/block_data.go @@ -110,20 +110,18 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) { // Marshal columns cds := bd.columnsData - a := getArena() csh := getColumnsHeader() chs := csh.resizeColumnHeaders(len(cds)) for i := range cds { - cds[i].mustWriteTo(a, &chs[i], sw) + cds[i].mustWriteToNoArena(&chs[i], sw) } - csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns) + csh.constColumns = append(csh.constColumns[:0], bd.constColumns...) bb := longTermBufPool.Get() bb.B = csh.marshal(bb.B) putColumnsHeader(csh) - putArena(a) bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten bh.columnsHeaderSize = uint64(len(bb.B)) @@ -310,8 +308,8 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) { // mustWriteTo writes cd to sw and updates ch accordingly. // -// ch is valid until a.reset() is called. -func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) { +// ch is valid until cd is changed. +func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) { ch.reset() valuesWriter := &sw.fieldValuesWriter @@ -321,12 +319,12 @@ func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) bloomFilterWriter = &sw.messageBloomFilterWriter } - ch.name = a.copyString(cd.name) + ch.name = cd.name ch.valueType = cd.valueType ch.minValue = cd.minValue ch.maxValue = cd.maxValue - ch.valuesDict.copyFrom(a, &cd.valuesDict) + ch.valuesDict.copyFromNoArena(&cd.valuesDict) // marshal values ch.valuesSize = uint64(len(cd.valuesData)) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index de6de0721..e80a3516b 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "math" "slices" + "sync/atomic" "time" "unsafe" @@ -12,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // blockResult holds results for a single block of log entries. @@ -107,7 +109,7 @@ func (br *blockResult) cloneValues(values []string) []string { for _, v := range values { if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, v) + valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) } else { bufLen := len(buf) buf = append(buf, v...) @@ -259,7 +261,14 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { } // Initialize timestamps, since they are required for all the further work with br. + if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") { + // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. + rowsLen := bm.onesCount() + br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) + return + } + // Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps. srcTimestamps := bs.getTimestamps() if bm.areAllBitsSet() { // Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering. @@ -285,7 +294,7 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap) appendValue := func(v string) { if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] { - valuesBuf = append(valuesBuf, v) + valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1]) } else { bufLen := len(buf) buf = append(buf, v...) @@ -1512,7 +1521,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { } } -func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMaxValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1620,7 +1629,7 @@ func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { } } -func (c *blockResultColumn) getMinValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMinValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1851,13 +1860,12 @@ func (rc *resultColumn) resetKeepName() { func (rc *resultColumn) addValue(v string) { values := rc.values if len(values) > 0 && string(v) == values[len(values)-1] { - rc.values = append(rc.values, values[len(values)-1]) - return + rc.values = append(values, values[len(values)-1]) + } else { + bufLen := len(rc.buf) + rc.buf = append(rc.buf, v...) + rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } - - bufLen := len(rc.buf) - rc.buf = append(rc.buf, v...) - rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) } func truncateTimestampToMonth(timestamp int64) int64 { @@ -1870,5 +1878,18 @@ func truncateTimestampToYear(timestamp int64) int64 { return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() } +func getEmptyStrings(rowsCount int) []string { + p := emptyStrings.Load() + if p == nil { + values := make([]string, rowsCount) + emptyStrings.Store(&values) + return values + } + values := *p + return slicesutil.SetLength(values, rowsCount) +} + +var emptyStrings atomic.Pointer[[]string] + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index 502e51941..6845f9c33 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -8,6 +8,12 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) +// The number of blocks to search at once by a single worker +// +// This number must be increased on systems with many CPU cores in order to amortize +// the overhead for passing the blockSearchWork to worker goroutines. +const blockSearchWorksPerBatch = 64 + type blockSearchWork struct { // p is the part where the block belongs to. p *part @@ -19,12 +25,54 @@ type blockSearchWork struct { bh blockHeader } -func newBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) *blockSearchWork { - var bsw blockSearchWork - bsw.p = p - bsw.so = so +func (bsw *blockSearchWork) reset() { + bsw.p = nil + bsw.so = nil + bsw.bh.reset() +} + +type blockSearchWorkBatch struct { + bsws []blockSearchWork +} + +func (bswb *blockSearchWorkBatch) reset() { + bsws := bswb.bsws + for i := range bsws { + bsws[i].reset() + } + bswb.bsws = bsws[:0] +} + +func getBlockSearchWorkBatch() *blockSearchWorkBatch { + v := blockSearchWorkBatchPool.Get() + if v == nil { + return &blockSearchWorkBatch{ + bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch), + } + } + return v.(*blockSearchWorkBatch) +} + +func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) { + bswb.reset() + blockSearchWorkBatchPool.Put(bswb) +} + +var blockSearchWorkBatchPool sync.Pool + +func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool { + bsws := bswb.bsws + + bsws = append(bsws, blockSearchWork{ + p: p, + so: so, + }) + bsw := &bsws[len(bsws)-1] bsw.bh.copyFrom(bh) - return &bsw + + bswb.bsws = bsws + + return len(bsws) < cap(bsws) } func getBlockSearch() *blockSearch { diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 7b4a46e02..fd5765004 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -3,6 +3,8 @@ package logstorage import ( "strings" "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // filterAnd contains filters joined by AND opertor. @@ -30,19 +32,10 @@ func (fa *filterAnd) String() string { } func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { - if tokens := fa.getMsgTokens(); len(tokens) > 0 { - // Verify whether fa tokens for the _msg field match bloom filter. - ch := bs.csh.getColumnHeader("_msg") - if ch == nil { - // Fast path - there is no _msg field in the block. - bm.resetBits() - return - } - if !matchBloomFilterAllTokens(bs, ch, tokens) { - // Fast path - fa tokens for the _msg field do not match bloom filter. - bm.resetBits() - return - } + if !fa.matchMessageBloomFilter(bs) { + // Fast path - fa doesn't match _msg bloom filter. + bm.resetBits() + return } // Slow path - verify every filter separately. @@ -56,7 +49,29 @@ func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) { } } -func (fa *filterAnd) getMsgTokens() []string { +func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool { + tokens := fa.getMessageTokens() + if len(tokens) == 0 { + return true + } + + v := bs.csh.getConstColumnValue("_msg") + if v != "" { + return matchStringByAllTokens(v, tokens) + } + + ch := bs.csh.getColumnHeader("_msg") + if ch == nil { + return false + } + + if ch.valueType == valueTypeDict { + return matchDictValuesByAllTokens(ch.valuesDict.values, tokens) + } + return matchBloomFilterAllTokens(bs, ch, tokens) +} + +func (fa *filterAnd) getMessageTokens() []string { fa.msgTokensOnce.Do(fa.initMsgTokens) return fa.msgTokens } @@ -89,3 +104,24 @@ func (fa *filterAnd) initMsgTokens() { } fa.msgTokens = a } + +func matchStringByAllTokens(v string, tokens []string) bool { + for _, token := range tokens { + if !matchPhrase(v, token) { + return false + } + } + return true +} + +func matchDictValuesByAllTokens(dictValues, tokens []string) bool { + bb := bbPool.Get() + for _, v := range dictValues { + bb.B = append(bb.B, v...) + bb.B = append(bb.B, ',') + } + v := bytesutil.ToUnsafeString(bb.B) + ok := matchStringByAllTokens(v, tokens) + bbPool.Put(bb) + return ok +} diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 97a06ee1d..ee9498a0b 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1395,6 +1395,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | sort by (f1)`, `*`, ``) f(`* | sort by (f1) | fields f2`, `f1,f2`, ``) + f(`_time:5m | sort by (_time) | fields foo`, `_time,foo`, ``) f(`* | sort by (f1) | fields *`, `*`, ``) f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`, ``) f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`, ``) diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 1d406e081..783c638e8 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) // pipeSort processes '| sort ...' queries. @@ -639,9 +640,9 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort continue } if isDesc { - return sB < sA + return stringsutil.LessNatural(sB, sA) } - return sA < sB + return stringsutil.LessNatural(sA, sB) } return false } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 3999bab48..354e25813 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -175,6 +175,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } } + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -307,7 +308,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } - //sup.keyBuf = keyBuf + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -324,6 +325,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } + sup.keyBuf = keyBuf return stateSizeIncrease } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index bfbd959c2..62e215d5f 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -38,7 +38,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the maximum value across all the columns for _, c := range br.getColumns() { - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } @@ -47,7 +47,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { // Find the maximum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 5aa20a6d0..bf157cb20 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -38,7 +38,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } @@ -47,7 +47,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index df0e561ae..d11e61387 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -202,12 +202,10 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string { return "[]" } - // Sort unique items items := make([]string, 0, len(sup.m)) for k := range sup.m { items = append(items, k) } - slices.SortFunc(items, compareValues) if limit := sup.su.limit; limit > 0 && uint64(len(items)) > limit { items = items[:limit] @@ -242,27 +240,6 @@ func marshalJSONArray(items []string) string { return bytesutil.ToUnsafeString(b) } -func compareValues(a, b string) int { - fA, okA := tryParseFloat64(a) - fB, okB := tryParseFloat64(b) - if okA && okB { - if fA == fB { - return 0 - } - if fA < fB { - return -1 - } - return 1 - } - if okA { - return -1 - } - if okB { - return 1 - } - return strings.Compare(a, b) -} - func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) { fields, err := parseFieldNamesForStatsFunc(lex, "uniq_values") if err != nil { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 10e924697..2b1ea3ba4 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -6,10 +6,8 @@ import ( "slices" "sort" "sync" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // genericSearchOptions contain options used for search. @@ -164,25 +162,6 @@ func (c *BlockColumn) reset() { c.Values = nil } -func getEmptyStrings(rowsCount int) []string { - p := emptyStrings.Load() - if p == nil { - values := make([]string, rowsCount) - emptyStrings.Store(&values) - return values - } - values := *p - return slicesutil.SetLength(values, rowsCount) -} - -var emptyStrings atomic.Pointer[[]string] - -// The number of blocks to search at once by a single worker -// -// This number must be increased on systems with many CPU cores in order to amortize -// the overhead for passing the blockSearchWork to worker goroutines. -const blockSearchWorksPerBatch = 64 - // searchResultFunc must process sr. // // The callback is called at the worker with the given workerID. @@ -194,16 +173,19 @@ type searchResultFunc func(workerID uint, br *blockResult) func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) { // Spin up workers var wgWorkers sync.WaitGroup - workCh := make(chan []*blockSearchWork, workersCount) + workCh := make(chan *blockSearchWorkBatch, workersCount) wgWorkers.Add(workersCount) for i := 0; i < workersCount; i++ { go func(workerID uint) { bs := getBlockSearch() - for bsws := range workCh { - for _, bsw := range bsws { + for bswb := range workCh { + bsws := bswb.bsws + for i := range bsws { + bsw := &bsws[i] select { case <-stopCh: // The search has been canceled. Just skip all the scheduled work in order to save CPU time. + bsw.reset() continue default: } @@ -212,7 +194,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch if len(bs.br.timestamps) > 0 { processBlockResult(workerID, &bs.br) } + bsw.reset() } + bswb.bsws = bswb.bsws[:0] + putBlockSearchWorkBatch(bswb) } putBlockSearch(bs) wgWorkers.Done() @@ -280,7 +265,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs type partitionSearchFinalizer func() -func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { select { case <-stopCh: // Do not spend CPU time on search, since it is already stopped. @@ -367,7 +352,7 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) return result } -func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { +func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) @@ -393,7 +378,7 @@ func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, s } } -func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { bhss := getBlockHeaders() if len(so.tenantIDs) > 0 { p.searchByTenantIDs(so, bhss, workCh, stopCh) @@ -430,27 +415,20 @@ func (bhss *blockHeaders) reset() { bhss.bhs = bhs[:0] } -func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that tenantIDs are sorted tenantIDs := so.tenantIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -535,35 +513,26 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } } -func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) { +func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) { // it is assumed that streamIDs are sorted streamIDs := so.streamIDs - bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + bswb := getBlockSearchWorkBatch() scheduleBlockSearch := func(bh *blockHeader) bool { - // Do not use pool for blockSearchWork, since it is returned back to the pool - // at another goroutine, which may run on another CPU core. - // This means that it will be put into another per-CPU pool, which may result - // in slowdown related to memory synchronization between CPU cores. - // This slowdown is increased on systems with bigger number of CPU cores. - bsw := newBlockSearchWork(p, so, bh) - bsws = append(bsws, bsw) - if len(bsws) < cap(bsws) { + if bswb.appendBlockSearchWork(p, so, bh) { return true } select { case <-stopCh: return false - case workCh <- bsws: - bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch) + case workCh <- bswb: + bswb = getBlockSearchWorkBatch() return true } } @@ -649,11 +618,9 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c } // Flush the remaining work - if len(bsws) > 0 { - select { - case <-stopCh: - case workCh <- bsws: - } + select { + case <-stopCh: + case workCh <- bswb: } } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 9b7da6069..8d323aae3 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -70,6 +70,8 @@ func (ve *valuesEncoder) reset() { } // encode encodes values to ve.values and returns the encoded value type with min/max encoded values. +// +// ve.values and dict is valid until values are changed. func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) { ve.reset() @@ -1091,6 +1093,12 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) { vd.values = dstValues } +func (vd *valuesDict) copyFromNoArena(src *valuesDict) { + vd.reset() + + vd.values = append(vd.values[:0], src.values...) +} + func (vd *valuesDict) getOrAdd(k string) (byte, bool) { if len(k) > maxDictSizeBytes { return 0, false