diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index de6de0721..980b3136e 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "math" "slices" + "sync/atomic" "time" "unsafe" @@ -12,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // blockResult holds results for a single block of log entries. @@ -1512,7 +1514,7 @@ func (c *blockResultColumn) getFloatValueAtRow(rowIdx int) float64 { } } -func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMaxValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1620,7 +1622,7 @@ func (c *blockResultColumn) getMaxValue(_ *blockResult) float64 { } } -func (c *blockResultColumn) getMinValue(_ *blockResult) float64 { +func (c *blockResultColumn) getMinValue() float64 { if c.isConst { v := c.encodedValues[0] f, ok := tryParseFloat64(v) @@ -1870,5 +1872,18 @@ func truncateTimestampToYear(timestamp int64) int64 { return time.Date(t.Year(), time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano() } +func getEmptyStrings(rowsCount int) []string { + p := emptyStrings.Load() + if p == nil { + values := make([]string, rowsCount) + emptyStrings.Store(&values) + return values + } + values := *p + return slicesutil.SetLength(values, rowsCount) +} + +var emptyStrings atomic.Pointer[[]string] + var nan = math.NaN() var inf = math.Inf(1) diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 3999bab48..354e25813 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -175,6 +175,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int { stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } } + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -307,7 +308,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } - //sup.keyBuf = keyBuf + sup.keyBuf = keyBuf return stateSizeIncrease } @@ -324,6 +325,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in m[string(keyBuf)] = struct{}{} stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } + sup.keyBuf = keyBuf return stateSizeIncrease } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index bfbd959c2..62e215d5f 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -38,7 +38,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the maximum value across all the columns for _, c := range br.getColumns() { - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } @@ -47,7 +47,7 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { // Find the maximum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMaxValue(br) + f := c.getMaxValue() if f > smp.max || math.IsNaN(smp.max) { smp.max = f } diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 5aa20a6d0..bf157cb20 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -38,7 +38,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } @@ -47,7 +47,7 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - f := c.getMinValue(br) + f := c.getMinValue() if f < smp.min || math.IsNaN(smp.min) { smp.min = f } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 10e924697..c8fe9df1a 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -6,10 +6,8 @@ import ( "slices" "sort" "sync" - "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // genericSearchOptions contain options used for search. @@ -164,19 +162,6 @@ func (c *BlockColumn) reset() { c.Values = nil } -func getEmptyStrings(rowsCount int) []string { - p := emptyStrings.Load() - if p == nil { - values := make([]string, rowsCount) - emptyStrings.Store(&values) - return values - } - values := *p - return slicesutil.SetLength(values, rowsCount) -} - -var emptyStrings atomic.Pointer[[]string] - // The number of blocks to search at once by a single worker // // This number must be increased on systems with many CPU cores in order to amortize diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 70f4c1f5f..cb0cee7b8 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -1096,11 +1096,7 @@ func (vd *valuesDict) copyFrom(a *arena, src *valuesDict) { func (vd *valuesDict) copyFromNoArena(src *valuesDict) { vd.reset() - dstValues := vd.values - for _, v := range src.values { - dstValues = append(dstValues, v) - } - vd.values = dstValues + vd.values = append(vd.values[:0], src.values...) } func (vd *valuesDict) getOrAdd(k string) (byte, bool) { diff --git a/lib/streamaggr/rate.go b/lib/streamaggr/rate.go index af8fe0786..4a4727c93 100644 --- a/lib/streamaggr/rate.go +++ b/lib/streamaggr/rate.go @@ -103,7 +103,7 @@ func (as *rateAggrState) pushSamples(samples []pushSample) { } } -func (as *rateAggrState) flushState(ctx *flushCtx, resetState bool) { +func (as *rateAggrState) flushState(ctx *flushCtx, _ bool) { currentTime := fasttime.UnixTimestamp() currentTimeMsec := int64(currentTime) * 1000