mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/logstorage: work-in-progress
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258
This commit is contained in:
parent
cc2647d212
commit
cb35e62e04
15 changed files with 203 additions and 138 deletions
|
@ -19,6 +19,11 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: use [natural sort order](https://en.wikipedia.org/wiki/Natural_sort_order) when sorting logs via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe).
|
||||
|
||||
* BUGFIX: properly return matching logs in [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with small number of entries. Previously they could be skipped. The issue has been introduced in [the release v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs).
|
||||
* BUGFIX: fix `runtime error: index out of range` panic when using [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) like `_time:1h | sort by (_time)`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6258).
|
||||
|
||||
## [v0.6.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.6.0-victorialogs)
|
||||
|
||||
Released at 2024-05-12
|
||||
|
|
|
@ -1175,7 +1175,10 @@ See also:
|
|||
|
||||
### sort pipe
|
||||
|
||||
By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) must be used.
|
||||
By default logs are selected in arbitrary order because of performance reasons. If logs must be sorted, then `| sort by (field1, ..., fieldN)` [pipe](#pipes) can be used.
|
||||
The returned logs are sorted by the given [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||
using [natural sorting](https://en.wikipedia.org/wiki/Natural_sort_order).
|
||||
|
||||
For example, the following query returns logs for the last 5 minutes sorted by [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
|
||||
and then by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
|
@ -1210,7 +1213,7 @@ See also:
|
|||
### uniq pipe
|
||||
|
||||
`| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query
|
||||
returns uniq values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
over logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
|
@ -1536,7 +1539,7 @@ See also:
|
|||
|
||||
`uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across
|
||||
the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
The returned values are sorted and encoded in JSON array.
|
||||
The returned values are encoded in JSON array. The order of the returned values is arbitrary.
|
||||
|
||||
For example, the following query returns unique non-empty values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
over logs for the last 5 minutes:
|
||||
|
|
|
@ -149,8 +149,8 @@ func (c *column) resizeValues(valuesLen int) []string {
|
|||
|
||||
// mustWriteTo writes c to sw and updates ch accordingly.
|
||||
//
|
||||
// ch is valid until a.reset() is called.
|
||||
func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
|
||||
// ch is valid until c is changed.
|
||||
func (c *column) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
|
||||
ch.reset()
|
||||
|
||||
valuesWriter := &sw.fieldValuesWriter
|
||||
|
@ -160,7 +160,7 @@ func (c *column) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
|
|||
bloomFilterWriter = &sw.messageBloomFilterWriter
|
||||
}
|
||||
|
||||
ch.name = a.copyString(c.name)
|
||||
ch.name = c.name
|
||||
|
||||
// encode values
|
||||
ve := getValuesEncoder()
|
||||
|
@ -454,20 +454,18 @@ func (b *block) mustWriteTo(sid *streamID, bh *blockHeader, sw *streamWriters) {
|
|||
// Marshal columns
|
||||
cs := b.columns
|
||||
|
||||
a := getArena()
|
||||
csh := getColumnsHeader()
|
||||
|
||||
chs := csh.resizeColumnHeaders(len(cs))
|
||||
for i := range cs {
|
||||
cs[i].mustWriteTo(a, &chs[i], sw)
|
||||
cs[i].mustWriteToNoArena(&chs[i], sw)
|
||||
}
|
||||
csh.constColumns = appendFields(a, csh.constColumns[:0], b.constColumns)
|
||||
csh.constColumns = append(csh.constColumns[:0], b.constColumns...)
|
||||
|
||||
bb := longTermBufPool.Get()
|
||||
bb.B = csh.marshal(bb.B)
|
||||
|
||||
putColumnsHeader(csh)
|
||||
putArena(a)
|
||||
|
||||
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
|
||||
bh.columnsHeaderSize = uint64(len(bb.B))
|
||||
|
|
|
@ -110,20 +110,18 @@ func (bd *blockData) mustWriteTo(bh *blockHeader, sw *streamWriters) {
|
|||
// Marshal columns
|
||||
cds := bd.columnsData
|
||||
|
||||
a := getArena()
|
||||
csh := getColumnsHeader()
|
||||
|
||||
chs := csh.resizeColumnHeaders(len(cds))
|
||||
for i := range cds {
|
||||
cds[i].mustWriteTo(a, &chs[i], sw)
|
||||
cds[i].mustWriteToNoArena(&chs[i], sw)
|
||||
}
|
||||
csh.constColumns = appendFields(a, csh.constColumns[:0], bd.constColumns)
|
||||
csh.constColumns = append(csh.constColumns[:0], bd.constColumns...)
|
||||
|
||||
bb := longTermBufPool.Get()
|
||||
bb.B = csh.marshal(bb.B)
|
||||
|
||||
putColumnsHeader(csh)
|
||||
putArena(a)
|
||||
|
||||
bh.columnsHeaderOffset = sw.columnsHeaderWriter.bytesWritten
|
||||
bh.columnsHeaderSize = uint64(len(bb.B))
|
||||
|
@ -310,8 +308,8 @@ func (cd *columnData) copyFrom(a *arena, src *columnData) {
|
|||
|
||||
// mustWriteTo writes cd to sw and updates ch accordingly.
|
||||
//
|
||||
// ch is valid until a.reset() is called.
|
||||
func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters) {
|
||||
// ch is valid until cd is changed.
|
||||
func (cd *columnData) mustWriteToNoArena(ch *columnHeader, sw *streamWriters) {
|
||||
ch.reset()
|
||||
|
||||
valuesWriter := &sw.fieldValuesWriter
|
||||
|
@ -321,12 +319,12 @@ func (cd *columnData) mustWriteTo(a *arena, ch *columnHeader, sw *streamWriters)
|
|||
bloomFilterWriter = &sw.messageBloomFilterWriter
|
||||
}
|
||||
|
||||
ch.name = a.copyString(cd.name)
|
||||
ch.name = cd.name
|
||||
ch.valueType = cd.valueType
|
||||
|
||||
ch.minValue = cd.minValue
|
||||
ch.maxValue = cd.maxValue
|
||||
ch.valuesDict.copyFrom(a, &cd.valuesDict)
|
||||
ch.valuesDict.copyFromNoArena(&cd.valuesDict)
|
||||
|
||||
// marshal values
|
||||
ch.valuesSize = uint64(len(cd.valuesData))
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"encoding/binary"
|
||||
"math"
|
||||
"slices"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
// blockResult holds results for a single block of log entries.
|
||||
|
@ -107,7 +109,7 @@ func (br *blockResult) cloneValues(values []string) []string {
|
|||
|
||||
for _, v := range values {
|
||||
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
|
||||
valuesBuf = append(valuesBuf, v)
|
||||
valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1])
|
||||
} else {
|
||||
bufLen := len(buf)
|
||||
buf = append(buf, v...)
|
||||
|
@ -259,7 +261,14 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
|
|||
}
|
||||
|
||||
// Initialize timestamps, since they are required for all the further work with br.
|
||||
if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") {
|
||||
// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
|
||||
rowsLen := bm.onesCount()
|
||||
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path - the _time column is requested, so we need to initialize br.timestamps with real timestamps.
|
||||
srcTimestamps := bs.getTimestamps()
|
||||
if bm.areAllBitsSet() {
|
||||
// Fast path - all the rows in the block are selected, so copy all the timestamps without any filtering.
|
||||
|
@ -285,7 +294,7 @@ func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *bitmap)
|
|||
|
||||
appendValue := func(v string) {
|
||||
if len(valuesBuf) > 0 && v == valuesBuf[len(valuesBuf)-1] {
|
||||
valuesBuf = append(valuesBuf, v)
|
||||
valuesBuf = append(valuesBuf, valuesBuf[len(valuesBuf)-1])
|
||||
} else {
|
||||
bufLen := len(buf)
|
||||
buf = append(buf, v...)
|
||||
|
@ -1512,7 +1521,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 {
|
||||
func (c *blockResultColumn) getMaxValue() float64 {
|
||||
if c.isConst {
|
||||
v := c.encodedValues[0]
|
||||
f, ok := tryParseFloat64(v)
|
||||
|
@ -1620,7 +1629,7 @@ func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *blockResultColumn) getMinValue(_ *blockResult) float64 {
|
||||
func (c *blockResultColumn) getMinValue() float64 {
|
||||
if c.isConst {
|
||||
v := c.encodedValues[0]
|
||||
f, ok := tryParseFloat64(v)
|
||||
|
@ -1851,13 +1860,12 @@ func (rc *resultColumn) resetKeepName() {
|
|||
func (rc *resultColumn) addValue(v string) {
|
||||
values := rc.values
|
||||
if len(values) > 0 && string(v) == values[len(values)-1] {
|
||||
rc.values = append(rc.values, values[len(values)-1])
|
||||
return
|
||||
rc.values = append(values, values[len(values)-1])
|
||||
} else {
|
||||
bufLen := len(rc.buf)
|
||||
rc.buf = append(rc.buf, v...)
|
||||
rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
|
||||
}
|
||||
|
||||
bufLen := len(rc.buf)
|
||||
rc.buf = append(rc.buf, v...)
|
||||
rc.values = append(values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
|
||||
}
|
||||
|
||||
func truncateTimestampToMonth(timestamp int64) int64 {
|
||||
|
@ -1870,5 +1878,18 @@ func truncateTimestampToYear(timestamp int64) int64 {
|
|||
return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
|
||||
}
|
||||
|
||||
func getEmptyStrings(rowsCount int) []string {
|
||||
p := emptyStrings.Load()
|
||||
if p == nil {
|
||||
values := make([]string, rowsCount)
|
||||
emptyStrings.Store(&values)
|
||||
return values
|
||||
}
|
||||
values := *p
|
||||
return slicesutil.SetLength(values, rowsCount)
|
||||
}
|
||||
|
||||
var emptyStrings atomic.Pointer[[]string]
|
||||
|
||||
var nan = math.NaN()
|
||||
var inf = math.Inf(1)
|
||||
|
|
|
@ -8,6 +8,12 @@ import (
|
|||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
// The number of blocks to search at once by a single worker
|
||||
//
|
||||
// This number must be increased on systems with many CPU cores in order to amortize
|
||||
// the overhead for passing the blockSearchWork to worker goroutines.
|
||||
const blockSearchWorksPerBatch = 64
|
||||
|
||||
type blockSearchWork struct {
|
||||
// p is the part where the block belongs to.
|
||||
p *part
|
||||
|
@ -19,12 +25,54 @@ type blockSearchWork struct {
|
|||
bh blockHeader
|
||||
}
|
||||
|
||||
func newBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) *blockSearchWork {
|
||||
var bsw blockSearchWork
|
||||
bsw.p = p
|
||||
bsw.so = so
|
||||
func (bsw *blockSearchWork) reset() {
|
||||
bsw.p = nil
|
||||
bsw.so = nil
|
||||
bsw.bh.reset()
|
||||
}
|
||||
|
||||
type blockSearchWorkBatch struct {
|
||||
bsws []blockSearchWork
|
||||
}
|
||||
|
||||
func (bswb *blockSearchWorkBatch) reset() {
|
||||
bsws := bswb.bsws
|
||||
for i := range bsws {
|
||||
bsws[i].reset()
|
||||
}
|
||||
bswb.bsws = bsws[:0]
|
||||
}
|
||||
|
||||
func getBlockSearchWorkBatch() *blockSearchWorkBatch {
|
||||
v := blockSearchWorkBatchPool.Get()
|
||||
if v == nil {
|
||||
return &blockSearchWorkBatch{
|
||||
bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch),
|
||||
}
|
||||
}
|
||||
return v.(*blockSearchWorkBatch)
|
||||
}
|
||||
|
||||
func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) {
|
||||
bswb.reset()
|
||||
blockSearchWorkBatchPool.Put(bswb)
|
||||
}
|
||||
|
||||
var blockSearchWorkBatchPool sync.Pool
|
||||
|
||||
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
|
||||
bsws := bswb.bsws
|
||||
|
||||
bsws = append(bsws, blockSearchWork{
|
||||
p: p,
|
||||
so: so,
|
||||
})
|
||||
bsw := &bsws[len(bsws)-1]
|
||||
bsw.bh.copyFrom(bh)
|
||||
return &bsw
|
||||
|
||||
bswb.bsws = bsws
|
||||
|
||||
return len(bsws) < cap(bsws)
|
||||
}
|
||||
|
||||
func getBlockSearch() *blockSearch {
|
||||
|
|
|
@ -3,6 +3,8 @@ package logstorage
|
|||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
)
|
||||
|
||||
// filterAnd contains filters joined by AND opertor.
|
||||
|
@ -30,19 +32,10 @@ func (fa *filterAnd) String() string {
|
|||
}
|
||||
|
||||
func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
|
||||
if tokens := fa.getMsgTokens(); len(tokens) > 0 {
|
||||
// Verify whether fa tokens for the _msg field match bloom filter.
|
||||
ch := bs.csh.getColumnHeader("_msg")
|
||||
if ch == nil {
|
||||
// Fast path - there is no _msg field in the block.
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
if !matchBloomFilterAllTokens(bs, ch, tokens) {
|
||||
// Fast path - fa tokens for the _msg field do not match bloom filter.
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
if !fa.matchMessageBloomFilter(bs) {
|
||||
// Fast path - fa doesn't match _msg bloom filter.
|
||||
bm.resetBits()
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path - verify every filter separately.
|
||||
|
@ -56,7 +49,29 @@ func (fa *filterAnd) apply(bs *blockSearch, bm *bitmap) {
|
|||
}
|
||||
}
|
||||
|
||||
func (fa *filterAnd) getMsgTokens() []string {
|
||||
func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool {
|
||||
tokens := fa.getMessageTokens()
|
||||
if len(tokens) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
v := bs.csh.getConstColumnValue("_msg")
|
||||
if v != "" {
|
||||
return matchStringByAllTokens(v, tokens)
|
||||
}
|
||||
|
||||
ch := bs.csh.getColumnHeader("_msg")
|
||||
if ch == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if ch.valueType == valueTypeDict {
|
||||
return matchDictValuesByAllTokens(ch.valuesDict.values, tokens)
|
||||
}
|
||||
return matchBloomFilterAllTokens(bs, ch, tokens)
|
||||
}
|
||||
|
||||
func (fa *filterAnd) getMessageTokens() []string {
|
||||
fa.msgTokensOnce.Do(fa.initMsgTokens)
|
||||
return fa.msgTokens
|
||||
}
|
||||
|
@ -89,3 +104,24 @@ func (fa *filterAnd) initMsgTokens() {
|
|||
}
|
||||
fa.msgTokens = a
|
||||
}
|
||||
|
||||
func matchStringByAllTokens(v string, tokens []string) bool {
|
||||
for _, token := range tokens {
|
||||
if !matchPhrase(v, token) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func matchDictValuesByAllTokens(dictValues, tokens []string) bool {
|
||||
bb := bbPool.Get()
|
||||
for _, v := range dictValues {
|
||||
bb.B = append(bb.B, v...)
|
||||
bb.B = append(bb.B, ',')
|
||||
}
|
||||
v := bytesutil.ToUnsafeString(bb.B)
|
||||
ok := matchStringByAllTokens(v, tokens)
|
||||
bbPool.Put(bb)
|
||||
return ok
|
||||
}
|
||||
|
|
|
@ -1395,6 +1395,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
|||
|
||||
f(`* | sort by (f1)`, `*`, ``)
|
||||
f(`* | sort by (f1) | fields f2`, `f1,f2`, ``)
|
||||
f(`_time:5m | sort by (_time) | fields foo`, `_time,foo`, ``)
|
||||
f(`* | sort by (f1) | fields *`, `*`, ``)
|
||||
f(`* | sort by (f1) | sort by (f2,f3 desc) desc`, `*`, ``)
|
||||
f(`* | sort by (f1) | sort by (f2,f3 desc) desc | fields f4`, `f1,f2,f3,f4`, ``)
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
|
||||
)
|
||||
|
||||
// pipeSort processes '| sort ...' queries.
|
||||
|
@ -639,9 +640,9 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
|
|||
continue
|
||||
}
|
||||
if isDesc {
|
||||
return sB < sA
|
||||
return stringsutil.LessNatural(sB, sA)
|
||||
}
|
||||
return sA < sB
|
||||
return stringsutil.LessNatural(sA, sB)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -175,6 +175,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||
}
|
||||
}
|
||||
sup.keyBuf = keyBuf
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
|
@ -307,7 +308,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
|
|||
m[string(keyBuf)] = struct{}{}
|
||||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||
}
|
||||
//sup.keyBuf = keyBuf
|
||||
sup.keyBuf = keyBuf
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
|
@ -324,6 +325,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
|
|||
m[string(keyBuf)] = struct{}{}
|
||||
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
|
||||
}
|
||||
sup.keyBuf = keyBuf
|
||||
return stateSizeIncrease
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
if smp.sm.containsStar {
|
||||
// Find the maximum value across all the columns
|
||||
for _, c := range br.getColumns() {
|
||||
f := c.getMaxValue(br)
|
||||
f := c.getMaxValue()
|
||||
if f > smp.max || math.IsNaN(smp.max) {
|
||||
smp.max = f
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
// Find the maximum value across the requested columns
|
||||
for _, field := range smp.sm.fields {
|
||||
c := br.getColumnByName(field)
|
||||
f := c.getMaxValue(br)
|
||||
f := c.getMaxValue()
|
||||
if f > smp.max || math.IsNaN(smp.max) {
|
||||
smp.max = f
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
if smp.sm.containsStar {
|
||||
// Find the minimum value across all the columns
|
||||
for _, c := range br.getColumns() {
|
||||
f := c.getMinValue(br)
|
||||
f := c.getMinValue()
|
||||
if f < smp.min || math.IsNaN(smp.min) {
|
||||
smp.min = f
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
// Find the minimum value across the requested columns
|
||||
for _, field := range smp.sm.fields {
|
||||
c := br.getColumnByName(field)
|
||||
f := c.getMinValue(br)
|
||||
f := c.getMinValue()
|
||||
if f < smp.min || math.IsNaN(smp.min) {
|
||||
smp.min = f
|
||||
}
|
||||
|
|
|
@ -202,12 +202,10 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string {
|
|||
return "[]"
|
||||
}
|
||||
|
||||
// Sort unique items
|
||||
items := make([]string, 0, len(sup.m))
|
||||
for k := range sup.m {
|
||||
items = append(items, k)
|
||||
}
|
||||
slices.SortFunc(items, compareValues)
|
||||
|
||||
if limit := sup.su.limit; limit > 0 && uint64(len(items)) > limit {
|
||||
items = items[:limit]
|
||||
|
@ -242,27 +240,6 @@ func marshalJSONArray(items []string) string {
|
|||
return bytesutil.ToUnsafeString(b)
|
||||
}
|
||||
|
||||
func compareValues(a, b string) int {
|
||||
fA, okA := tryParseFloat64(a)
|
||||
fB, okB := tryParseFloat64(b)
|
||||
if okA && okB {
|
||||
if fA == fB {
|
||||
return 0
|
||||
}
|
||||
if fA < fB {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
}
|
||||
if okA {
|
||||
return -1
|
||||
}
|
||||
if okB {
|
||||
return 1
|
||||
}
|
||||
return strings.Compare(a, b)
|
||||
}
|
||||
|
||||
func parseStatsUniqValues(lex *lexer) (*statsUniqValues, error) {
|
||||
fields, err := parseFieldNamesForStatsFunc(lex, "uniq_values")
|
||||
if err != nil {
|
||||
|
|
|
@ -6,10 +6,8 @@ import (
|
|||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
// genericSearchOptions contain options used for search.
|
||||
|
@ -164,25 +162,6 @@ func (c *BlockColumn) reset() {
|
|||
c.Values = nil
|
||||
}
|
||||
|
||||
func getEmptyStrings(rowsCount int) []string {
|
||||
p := emptyStrings.Load()
|
||||
if p == nil {
|
||||
values := make([]string, rowsCount)
|
||||
emptyStrings.Store(&values)
|
||||
return values
|
||||
}
|
||||
values := *p
|
||||
return slicesutil.SetLength(values, rowsCount)
|
||||
}
|
||||
|
||||
var emptyStrings atomic.Pointer[[]string]
|
||||
|
||||
// The number of blocks to search at once by a single worker
|
||||
//
|
||||
// This number must be increased on systems with many CPU cores in order to amortize
|
||||
// the overhead for passing the blockSearchWork to worker goroutines.
|
||||
const blockSearchWorksPerBatch = 64
|
||||
|
||||
// searchResultFunc must process sr.
|
||||
//
|
||||
// The callback is called at the worker with the given workerID.
|
||||
|
@ -194,16 +173,19 @@ type searchResultFunc func(workerID uint, br *blockResult)
|
|||
func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-chan struct{}, processBlockResult searchResultFunc) {
|
||||
// Spin up workers
|
||||
var wgWorkers sync.WaitGroup
|
||||
workCh := make(chan []*blockSearchWork, workersCount)
|
||||
workCh := make(chan *blockSearchWorkBatch, workersCount)
|
||||
wgWorkers.Add(workersCount)
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
bs := getBlockSearch()
|
||||
for bsws := range workCh {
|
||||
for _, bsw := range bsws {
|
||||
for bswb := range workCh {
|
||||
bsws := bswb.bsws
|
||||
for i := range bsws {
|
||||
bsw := &bsws[i]
|
||||
select {
|
||||
case <-stopCh:
|
||||
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
|
||||
bsw.reset()
|
||||
continue
|
||||
default:
|
||||
}
|
||||
|
@ -212,7 +194,10 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
|||
if len(bs.br.timestamps) > 0 {
|
||||
processBlockResult(workerID, &bs.br)
|
||||
}
|
||||
bsw.reset()
|
||||
}
|
||||
bswb.bsws = bswb.bsws[:0]
|
||||
putBlockSearchWorkBatch(bswb)
|
||||
}
|
||||
putBlockSearch(bs)
|
||||
wgWorkers.Done()
|
||||
|
@ -280,7 +265,7 @@ var partitionSearchConcurrencyLimitCh = make(chan struct{}, cgroup.AvailableCPUs
|
|||
|
||||
type partitionSearchFinalizer func()
|
||||
|
||||
func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
func (pt *partition) search(ft *filterTime, sf *StreamFilter, f filter, so *genericSearchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
select {
|
||||
case <-stopCh:
|
||||
// Do not spend CPU time on search, since it is already stopped.
|
||||
|
@ -367,7 +352,7 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter)
|
|||
return result
|
||||
}
|
||||
|
||||
func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
func (ddb *datadb) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) partitionSearchFinalizer {
|
||||
// Select parts with data for the given time range
|
||||
ddb.partsLock.Lock()
|
||||
pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp)
|
||||
|
@ -393,7 +378,7 @@ func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, s
|
|||
}
|
||||
}
|
||||
|
||||
func (p *part) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) {
|
||||
func (p *part) search(so *searchOptions, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
|
||||
bhss := getBlockHeaders()
|
||||
if len(so.tenantIDs) > 0 {
|
||||
p.searchByTenantIDs(so, bhss, workCh, stopCh)
|
||||
|
@ -430,27 +415,20 @@ func (bhss *blockHeaders) reset() {
|
|||
bhss.bhs = bhs[:0]
|
||||
}
|
||||
|
||||
func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) {
|
||||
func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
|
||||
// it is assumed that tenantIDs are sorted
|
||||
tenantIDs := so.tenantIDs
|
||||
|
||||
bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch)
|
||||
bswb := getBlockSearchWorkBatch()
|
||||
scheduleBlockSearch := func(bh *blockHeader) bool {
|
||||
// Do not use pool for blockSearchWork, since it is returned back to the pool
|
||||
// at another goroutine, which may run on another CPU core.
|
||||
// This means that it will be put into another per-CPU pool, which may result
|
||||
// in slowdown related to memory synchronization between CPU cores.
|
||||
// This slowdown is increased on systems with bigger number of CPU cores.
|
||||
bsw := newBlockSearchWork(p, so, bh)
|
||||
bsws = append(bsws, bsw)
|
||||
if len(bsws) < cap(bsws) {
|
||||
if bswb.appendBlockSearchWork(p, so, bh) {
|
||||
return true
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
return false
|
||||
case workCh <- bsws:
|
||||
bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch)
|
||||
case workCh <- bswb:
|
||||
bswb = getBlockSearchWorkBatch()
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -535,35 +513,26 @@ func (p *part) searchByTenantIDs(so *searchOptions, bhss *blockHeaders, workCh c
|
|||
}
|
||||
|
||||
// Flush the remaining work
|
||||
if len(bsws) > 0 {
|
||||
select {
|
||||
case <-stopCh:
|
||||
case workCh <- bsws:
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
case workCh <- bswb:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) {
|
||||
func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh chan<- *blockSearchWorkBatch, stopCh <-chan struct{}) {
|
||||
// it is assumed that streamIDs are sorted
|
||||
streamIDs := so.streamIDs
|
||||
|
||||
bsws := make([]*blockSearchWork, 0, blockSearchWorksPerBatch)
|
||||
bswb := getBlockSearchWorkBatch()
|
||||
scheduleBlockSearch := func(bh *blockHeader) bool {
|
||||
// Do not use pool for blockSearchWork, since it is returned back to the pool
|
||||
// at another goroutine, which may run on another CPU core.
|
||||
// This means that it will be put into another per-CPU pool, which may result
|
||||
// in slowdown related to memory synchronization between CPU cores.
|
||||
// This slowdown is increased on systems with bigger number of CPU cores.
|
||||
bsw := newBlockSearchWork(p, so, bh)
|
||||
bsws = append(bsws, bsw)
|
||||
if len(bsws) < cap(bsws) {
|
||||
if bswb.appendBlockSearchWork(p, so, bh) {
|
||||
return true
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
return false
|
||||
case workCh <- bsws:
|
||||
bsws = make([]*blockSearchWork, 0, blockSearchWorksPerBatch)
|
||||
case workCh <- bswb:
|
||||
bswb = getBlockSearchWorkBatch()
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
@ -649,11 +618,9 @@ func (p *part) searchByStreamIDs(so *searchOptions, bhss *blockHeaders, workCh c
|
|||
}
|
||||
|
||||
// Flush the remaining work
|
||||
if len(bsws) > 0 {
|
||||
select {
|
||||
case <-stopCh:
|
||||
case workCh <- bsws:
|
||||
}
|
||||
select {
|
||||
case <-stopCh:
|
||||
case workCh <- bswb:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,8 @@ func (ve *valuesEncoder) reset() {
|
|||
}
|
||||
|
||||
// encode encodes values to ve.values and returns the encoded value type with min/max encoded values.
|
||||
//
|
||||
// ve.values and dict is valid until values are changed.
|
||||
func (ve *valuesEncoder) encode(values []string, dict *valuesDict) (valueType, uint64, uint64) {
|
||||
ve.reset()
|
||||
|
||||
|
@ -1091,6 +1093,12 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) {
|
|||
vd.values = dstValues
|
||||
}
|
||||
|
||||
func (vd *valuesDict) copyFromNoArena(src *valuesDict) {
|
||||
vd.reset()
|
||||
|
||||
vd.values = append(vd.values[:0], src.values...)
|
||||
}
|
||||
|
||||
func (vd *valuesDict) getOrAdd(k string) (byte, bool) {
|
||||
if len(k) > maxDictSizeBytes {
|
||||
return 0, false
|
||||
|
|
Loading…
Reference in a new issue