This commit is contained in:
Aliaksandr Valialkin 2024-05-13 16:45:34 +02:00
parent 9673da2578
commit 900e558678
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 26 additions and 28 deletions

View file

@ -4,6 +4,7 @@ import (
"encoding/binary" "encoding/binary"
"math" "math"
"slices" "slices"
"sync/atomic"
"time" "time"
"unsafe" "unsafe"
@ -12,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
// blockResult holds results for a single block of log entries. // blockResult holds results for a single block of log entries.
@ -1512,7 +1514,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 {
} }
} }
func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { func (c *blockResultColumn) getMaxValue() float64 {
if c.isConst { if c.isConst {
v := c.encodedValues[0] v := c.encodedValues[0]
f, ok := tryParseFloat64(v) f, ok := tryParseFloat64(v)
@ -1620,7 +1622,7 @@ func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 {
} }
} }
func (c *blockResultColumn) getMinValue(_ *blockResult) float64 { func (c *blockResultColumn) getMinValue() float64 {
if c.isConst { if c.isConst {
v := c.encodedValues[0] v := c.encodedValues[0]
f, ok := tryParseFloat64(v) f, ok := tryParseFloat64(v)
@ -1870,5 +1872,18 @@ func truncateTimestampToYear(timestamp int64) int64 {
return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()
} }
func getEmptyStrings(rowsCount int) []string {
p := emptyStrings.Load()
if p == nil {
values := make([]string, rowsCount)
emptyStrings.Store(&values)
return values
}
values := *p
return slicesutil.SetLength(values, rowsCount)
}
var emptyStrings atomic.Pointer[[]string]
var nan = math.NaN() var nan = math.NaN()
var inf = math.Inf(1) var inf = math.Inf(1)

View file

@ -175,6 +175,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
} }
} }
sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -307,7 +308,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
m[string(keyBuf)] = struct{}{} m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
} }
//sup.keyBuf = keyBuf sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }
@ -324,6 +325,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
m[string(keyBuf)] = struct{}{} m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
} }
sup.keyBuf = keyBuf
return stateSizeIncrease return stateSizeIncrease
} }

View file

@ -38,7 +38,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the maximum value across all the columns // Find the maximum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMaxValue(br) f := c.getMaxValue()
if f > smp.max || math.IsNaN(smp.max) { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }
@ -47,7 +47,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
// Find the maximum value across the requested columns // Find the maximum value across the requested columns
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMaxValue(br) f := c.getMaxValue()
if f > smp.max || math.IsNaN(smp.max) { if f > smp.max || math.IsNaN(smp.max) {
smp.max = f smp.max = f
} }

View file

@ -38,7 +38,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
if smp.sm.containsStar { if smp.sm.containsStar {
// Find the minimum value across all the columns // Find the minimum value across all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f := c.getMinValue(br) f := c.getMinValue()
if f < smp.min || math.IsNaN(smp.min) { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }
@ -47,7 +47,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {
// Find the minimum value across the requested columns // Find the minimum value across the requested columns
for _, field := range smp.sm.fields { for _, field := range smp.sm.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f := c.getMinValue(br) f := c.getMinValue()
if f < smp.min || math.IsNaN(smp.min) { if f < smp.min || math.IsNaN(smp.min) {
smp.min = f smp.min = f
} }

View file

@ -6,10 +6,8 @@ import (
"slices" "slices"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
// genericSearchOptions contain options used for search. // genericSearchOptions contain options used for search.
@ -164,19 +162,6 @@ func (c *BlockColumn) reset() {
c.Values = nil c.Values = nil
} }
func getEmptyStrings(rowsCount int) []string {
p := emptyStrings.Load()
if p == nil {
values := make([]string, rowsCount)
emptyStrings.Store(&values)
return values
}
values := *p
return slicesutil.SetLength(values, rowsCount)
}
var emptyStrings atomic.Pointer[[]string]
// The number of blocks to search at once by a single worker // The number of blocks to search at once by a single worker
// //
// This number must be increased on systems with many CPU cores in order to amortize // This number must be increased on systems with many CPU cores in order to amortize

View file

@ -1096,11 +1096,7 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) {
func (vd *valuesDict) copyFromNoArena(src *valuesDict) { func (vd *valuesDict) copyFromNoArena(src *valuesDict) {
vd.reset() vd.reset()
dstValues := vd.values vd.values = append(vd.values[:0], src.values...)
for _, v := range src.values {
dstValues = append(dstValues, v)
}
vd.values = dstValues
} }
func (vd *valuesDict) getOrAdd(k string) (byte, bool) { func (vd *valuesDict) getOrAdd(k string) (byte, bool) {

View file

@ -103,7 +103,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) {
} }
} }
func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) {
currentTime := fasttime.UnixTimestamp() currentTime := fasttime.UnixTimestamp()
currentTimeMsec := int64(currentTime) * 1000 currentTimeMsec := int64(currentTime) * 1000