This commit is contained in:
Aliaksandr Valialkin 2024-05-15 22:19:21 +02:00
parent 7ffcdabcbe
commit 164705cf20
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
18 changed files with 496 additions and 358 deletions

View file

@ -4,6 +4,7 @@ import (
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
) )
func getArena() *arena { func getArena() *arena {
@ -29,8 +30,12 @@ func (a *arena) reset() {
a.b = a.b[:0] a.b = a.b[:0]
} }
func (a *arena) preallocate(n int) {
a.b = slicesutil.ExtendCapacity(a.b, n)
}
func (a *arena) sizeBytes() int { func (a *arena) sizeBytes() int {
return len(a.b) return cap(a.b)
} }
func (a *arena) copyBytes(b []byte) []byte { func (a *arena) copyBytes(b []byte) []byte {
@ -41,9 +46,8 @@ func (a *arena) copyBytes(b []byte) []byte {
ab := a.b ab := a.b
abLen := len(ab) abLen := len(ab)
ab = append(ab, b...) ab = append(ab, b...)
result := ab[abLen:]
a.b = ab a.b = ab
return result return ab[abLen:]
} }
func (a *arena) copyBytesToString(b []byte) string { func (a *arena) copyBytesToString(b []byte) string {

View file

@ -35,7 +35,7 @@ func TestArena(t *testing.T) {
} }
} }
if n := a.sizeBytes(); n != valuesLen { if n := a.sizeBytes(); n < valuesLen {
t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen)
} }
@ -47,7 +47,7 @@ func TestArena(t *testing.T) {
t.Fatalf("unexpected len(b); got %d; want %d", len(b), j) t.Fatalf("unexpected len(b); got %d; want %d", len(b), j)
} }
valuesLen += j valuesLen += j
if n := a.sizeBytes(); n != valuesLen { if n := a.sizeBytes(); n < valuesLen {
t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen) t.Fatalf("unexpected arena size; got %d; want %d", n, valuesLen)
} }
for k := range b { for k := range b {

View file

@ -3,6 +3,7 @@ package logstorage
import ( import (
"math/bits" "math/bits"
"sync" "sync"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
@ -37,6 +38,10 @@ func (bm *bitmap) reset() {
bm.bitsLen = 0 bm.bitsLen = 0
} }
func (bm *bitmap) sizeBytes() int {
return int(unsafe.Sizeof(*bm)) + cap(bm.a)*int(unsafe.Sizeof(bm.a[0]))
}
func (bm *bitmap) copyFrom(src *bitmap) { func (bm *bitmap) copyFrom(src *bitmap) {
bm.reset() bm.reset()
@ -149,7 +154,8 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
// forEachSetBitReadonly calls f for each set bit // forEachSetBitReadonly calls f for each set bit
func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) { func (bm *bitmap) forEachSetBitReadonly(f func(idx int)) {
if bm.areAllBitsSet() { if bm.areAllBitsSet() {
for i := range bm.bitsLen { n := bm.bitsLen
for i := 0; i < n; i++ {
f(i) f(i)
} }
return return

File diff suppressed because it is too large Load diff

View file

@ -168,9 +168,9 @@ func (bs *blockSearch) search(bsw *blockSearchWork) {
// fetch the requested columns to bs.br. // fetch the requested columns to bs.br.
if bs.bsw.so.needAllColumns { if bs.bsw.so.needAllColumns {
bs.br.fetchAllColumns(bs, bm) bs.br.initAllColumns()
} else { } else {
bs.br.fetchRequestedColumns(bs, bm) bs.br.initRequestedColumns()
} }
} }

View file

@ -232,13 +232,13 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
{ {
c: &blockResultColumn{ c: &blockResultColumn{
valueType: valueTypeString, valueType: valueTypeString,
encodedValues: rc.values, valuesEncoded: rc.values,
}, },
i64Values: i64Values, i64Values: i64Values,
f64Values: f64Values, f64Values: f64Values,
}, },
} }
shard.stateSizeBudget -= len(rc.buf) + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c)) shard.stateSizeBudget -= rc.sizeBytes() + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c))
// Append br to shard.blocks. // Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{ shard.blocks = append(shard.blocks, sortBlock{
@ -260,8 +260,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
continue continue
} }
if c.isConst { if c.isConst {
bc.i64Values = shard.createInt64Values(c.encodedValues) bc.i64Values = shard.createInt64Values(c.valuesEncoded)
bc.f64Values = shard.createFloat64Values(c.encodedValues) bc.f64Values = shard.createFloat64Values(c.valuesEncoded)
continue continue
} }
@ -610,8 +610,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
if cA.c.isConst && cB.c.isConst { if cA.c.isConst && cB.c.isConst {
// Fast path - compare const values // Fast path - compare const values
ccA := cA.c.encodedValues[0] ccA := cA.c.valuesEncoded[0]
ccB := cB.c.encodedValues[0] ccB := cB.c.valuesEncoded[0]
if ccA == ccB { if ccA == ccB {
continue continue
} }

View file

@ -182,14 +182,14 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
c := br.getColumnByName(bf.name) c := br.getColumnByName(bf.name)
if c.isConst { if c.isConst {
// Fast path for column with constant value. // Fast path for column with constant value.
v := br.getBucketedValue(c.encodedValues[0], bf) v := br.getBucketedValue(c.valuesEncoded[0], bf)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
psg := shard.getPipeStatsGroup(shard.keyBuf) psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= psg.updateStatsForAllRows(br) shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
return return
} }
values := c.getBucketedValues(br, bf) values := c.getValuesBucketed(br, bf)
if areConstValues(values) { if areConstValues(values) {
// Fast path for column with constant values. // Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
@ -216,7 +216,7 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
columnValues := shard.columnValues[:0] columnValues := shard.columnValues[:0]
for _, bf := range byFields { for _, bf := range byFields {
c := br.getColumnByName(bf.name) c := br.getColumnByName(bf.name)
values := c.getBucketedValues(br, bf) values := c.getValuesBucketed(br, bf)
columnValues = append(columnValues, values) columnValues = append(columnValues, values)
} }
shard.columnValues = columnValues shard.columnValues = columnValues

View file

@ -57,7 +57,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
if sap.sa.containsStar { if sap.sa.containsStar {
// Scan all the fields for the given row // Scan all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
sap.sum += f sap.sum += f
sap.count++ sap.count++
@ -67,7 +67,7 @@ func (sap *statsAvgProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
// Scan only the given fields for the given row // Scan only the given fields for the given row
for _, field := range sap.sa.fields { for _, field := range sap.sa.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
sap.sum += f sap.sum += f
sap.count++ sap.count++

View file

@ -49,7 +49,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
// Fast path for count(single_column) // Fast path for count(single_column)
c := br.getColumnByName(fields[0]) c := br.getColumnByName(fields[0])
if c.isConst { if c.isConst {
if c.encodedValues[0] != "" { if c.valuesEncoded[0] != "" {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
} }
return 0 return 0
@ -60,7 +60,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
if v != "" { if v != "" {
scp.rowsCount++ scp.rowsCount++
} }
@ -72,7 +72,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
} }
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
if int(v[0]) != zeroDictIdx { if int(v[0]) != zeroDictIdx {
scp.rowsCount++ scp.rowsCount++
} }
@ -95,7 +95,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
for _, f := range fields { for _, f := range fields {
c := br.getColumnByName(f) c := br.getColumnByName(f)
if c.isConst { if c.isConst {
if c.encodedValues[0] != "" { if c.valuesEncoded[0] != "" {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
} }
@ -105,18 +105,21 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(i int) bool { bm.forEachSetBit(func(i int) bool {
return c.encodedValues[i] == "" return valuesEncoded[i] == ""
}) })
case valueTypeDict: case valueTypeDict:
if !slices.Contains(c.dictValues, "") { if !slices.Contains(c.dictValues, "") {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
} }
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(i int) bool { bm.forEachSetBit(func(i int) bool {
dictIdx := c.encodedValues[i][0] dictIdx := valuesEncoded[i][0]
return c.dictValues[dictIdx] == "" return c.dictValues[dictIdx] == ""
}) })
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
@ -144,7 +147,7 @@ func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) i
// Fast path for count(single_column) // Fast path for count(single_column)
c := br.getColumnByName(fields[0]) c := br.getColumnByName(fields[0])
if c.isConst { if c.isConst {
if c.encodedValues[0] != "" { if c.valuesEncoded[0] != "" {
scp.rowsCount++ scp.rowsCount++
} }
return 0 return 0
@ -155,12 +158,14 @@ func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) i
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
if v := c.encodedValues[rowIdx]; v != "" { valuesEncoded := c.getValuesEncoded(br)
if v := valuesEncoded[rowIdx]; v != "" {
scp.rowsCount++ scp.rowsCount++
} }
return 0 return 0
case valueTypeDict: case valueTypeDict:
dictIdx := c.encodedValues[rowIdx][0] valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
if v := c.dictValues[dictIdx]; v != "" { if v := c.dictValues[dictIdx]; v != "" {
scp.rowsCount++ scp.rowsCount++
} }

View file

@ -53,7 +53,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
// Fast path for count_empty(single_column) // Fast path for count_empty(single_column)
c := br.getColumnByName(fields[0]) c := br.getColumnByName(fields[0])
if c.isConst { if c.isConst {
if c.encodedValues[0] == "" { if c.valuesEncoded[0] == "" {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
} }
return 0 return 0
@ -63,7 +63,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
if v == "" { if v == "" {
scp.rowsCount++ scp.rowsCount++
} }
@ -74,7 +74,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
if zeroDictIdx < 0 { if zeroDictIdx < 0 {
return 0 return 0
} }
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
if int(v[0]) == zeroDictIdx { if int(v[0]) == zeroDictIdx {
scp.rowsCount++ scp.rowsCount++
} }
@ -96,7 +96,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
for _, f := range fields { for _, f := range fields {
c := br.getColumnByName(f) c := br.getColumnByName(f)
if c.isConst { if c.isConst {
if c.encodedValues[0] == "" { if c.valuesEncoded[0] == "" {
scp.rowsCount += uint64(len(br.timestamps)) scp.rowsCount += uint64(len(br.timestamps))
return 0 return 0
} }
@ -107,15 +107,17 @@ func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(i int) bool { bm.forEachSetBit(func(i int) bool {
return c.encodedValues[i] == "" return valuesEncoded[i] == ""
}) })
case valueTypeDict: case valueTypeDict:
if !slices.Contains(c.dictValues, "") { if !slices.Contains(c.dictValues, "") {
return 0 return 0
} }
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(i int) bool { bm.forEachSetBit(func(i int) bool {
dictIdx := c.encodedValues[i][0] dictIdx := valuesEncoded[i][0]
return c.dictValues[dictIdx] == "" return c.dictValues[dictIdx] == ""
}) })
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
@ -145,7 +147,7 @@ func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx i
// Fast path for count_empty(single_column) // Fast path for count_empty(single_column)
c := br.getColumnByName(fields[0]) c := br.getColumnByName(fields[0])
if c.isConst { if c.isConst {
if c.encodedValues[0] == "" { if c.valuesEncoded[0] == "" {
scp.rowsCount++ scp.rowsCount++
} }
return 0 return 0
@ -155,12 +157,14 @@ func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx i
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
if v := c.encodedValues[rowIdx]; v == "" { valuesEncoded := c.getValuesEncoded(br)
if v := valuesEncoded[rowIdx]; v == "" {
scp.rowsCount++ scp.rowsCount++
} }
return 0 return 0
case valueTypeDict: case valueTypeDict:
dictIdx := c.encodedValues[rowIdx][0] valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
if v := c.dictValues[dictIdx]; v == "" { if v := c.dictValues[dictIdx]; v == "" {
scp.rowsCount++ scp.rowsCount++
} }

View file

@ -122,7 +122,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
} }
if c.isConst { if c.isConst {
// count unique const values // count unique const values
v := c.encodedValues[0] v := c.valuesEncoded[0]
if v == "" { if v == "" {
// Do not count empty values // Do not count empty values
return stateSizeIncrease return stateSizeIncrease
@ -156,7 +156,7 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease return stateSizeIncrease
} }
// Count unique values across encodedValues // Count unique values across values
values := c.getValues(br) values := c.getValues(br)
keyBuf := sup.keyBuf[:0] keyBuf := sup.keyBuf[:0]
for i, v := range values { for i, v := range values {
@ -278,7 +278,7 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
} }
if c.isConst { if c.isConst {
// count unique const values // count unique const values
v := c.encodedValues[0] v := c.valuesEncoded[0]
if v == "" { if v == "" {
// Do not count empty values // Do not count empty values
return stateSizeIncrease return stateSizeIncrease
@ -295,7 +295,8 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
// count unique non-zero c.dictValues // count unique non-zero c.dictValues
dictIdx := c.encodedValues[rowIdx][0] valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
v := c.dictValues[dictIdx] v := c.dictValues[dictIdx]
if v == "" { if v == "" {
// Do not count empty values // Do not count empty values

View file

@ -108,14 +108,14 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
if c.isConst { if c.isConst {
// Special case for const column // Special case for const column
v := c.encodedValues[0] v := c.valuesEncoded[0]
smp.updateStateString(v) smp.updateStateString(v)
return return
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
smp.updateStateString(v) smp.updateStateString(v)
} }
case valueTypeDict: case valueTypeDict:
@ -124,23 +124,23 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.maxValue) bb.B = marshalUint64String(bb.B[:0], c.ch.maxValue)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeFloat64: case valueTypeFloat64:
f := math.Float64frombits(c.maxValue) f := math.Float64frombits(c.ch.maxValue)
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], f) bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeIPv4: case valueTypeIPv4:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], uint32(c.maxValue)) bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.maxValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.maxValue)) bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.maxValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
default: default:

View file

@ -108,14 +108,14 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
if c.isConst { if c.isConst {
// Special case for const column // Special case for const column
v := c.encodedValues[0] v := c.valuesEncoded[0]
smp.updateStateString(v) smp.updateStateString(v)
return return
} }
switch c.valueType { switch c.valueType {
case valueTypeString: case valueTypeString:
for _, v := range c.encodedValues { for _, v := range c.getValuesEncoded(br) {
smp.updateStateString(v) smp.updateStateString(v)
} }
case valueTypeDict: case valueTypeDict:
@ -124,23 +124,23 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64: case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalUint64String(bb.B[:0], c.minValue) bb.B = marshalUint64String(bb.B[:0], c.ch.minValue)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeFloat64: case valueTypeFloat64:
f := math.Float64frombits(c.minValue) f := math.Float64frombits(c.ch.minValue)
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalFloat64String(bb.B[:0], f) bb.B = marshalFloat64String(bb.B[:0], f)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeIPv4: case valueTypeIPv4:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalIPv4String(bb.B[:0], uint32(c.minValue)) bb.B = marshalIPv4String(bb.B[:0], uint32(c.ch.minValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
case valueTypeTimestampISO8601: case valueTypeTimestampISO8601:
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.minValue)) bb.B = marshalTimestampISO8601String(bb.B[:0], int64(c.ch.minValue))
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)
bbPool.Put(bb) bbPool.Put(bb)
default: default:

View file

@ -2,11 +2,15 @@ package logstorage
import ( import (
"fmt" "fmt"
"math"
"slices" "slices"
"strconv" "strconv"
"unsafe" "unsafe"
"github.com/valyala/fastrand" "github.com/valyala/fastrand"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
type statsQuantile struct { type statsQuantile struct {
@ -38,27 +42,16 @@ type statsQuantileProcessor struct {
} }
func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int {
h := &sqp.h
stateSizeIncrease := 0 stateSizeIncrease := 0
if sqp.sq.containsStar { if sqp.sq.containsStar {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
for _, v := range c.getValues(br) { stateSizeIncrease += sqp.updateStateForColumn(br, c)
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
} }
} else { } else {
for _, field := range sqp.sq.fields { for _, field := range sqp.sq.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
for _, v := range c.getValues(br) { stateSizeIncrease += sqp.updateStateForColumn(br, c)
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
} }
} }
@ -71,7 +64,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
if sqp.sq.containsStar { if sqp.sq.containsStar {
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
stateSizeIncrease += h.update(f) stateSizeIncrease += h.update(f)
} }
@ -79,7 +72,7 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
} else { } else {
for _, field := range sqp.sq.fields { for _, field := range sqp.sq.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
stateSizeIncrease += h.update(f) stateSizeIncrease += h.update(f)
} }
@ -89,6 +82,85 @@ func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int
return stateSizeIncrease return stateSizeIncrease
} }
func (sqp *statsQuantileProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) int {
h := &sqp.h
stateSizeIncrease := 0
if c.isConst {
f, ok := tryParseFloat64(c.valuesEncoded[0])
if ok {
for range br.timestamps {
stateSizeIncrease += h.update(f)
}
}
return stateSizeIncrease
}
if c.isTime {
return 0
}
switch c.valueType {
case valueTypeString:
for _, v := range c.getValues(br) {
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
case valueTypeDict:
dictValues := c.dictValues
a := encoding.GetFloat64s(len(dictValues))
for i, v := range dictValues {
f, ok := tryParseFloat64(v)
if !ok {
f = nan
}
a.A[i] = f
}
for _, v := range c.getValuesEncoded(br) {
idx := v[0]
f := a.A[idx]
if !math.IsNaN(f) {
h.update(f)
}
}
encoding.PutFloat64s(a)
case valueTypeUint8:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint8(v)
h.update(float64(n))
}
case valueTypeUint16:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint16(v)
h.update(float64(n))
}
case valueTypeUint32:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint32(v)
h.update(float64(n))
}
case valueTypeUint64:
for _, v := range c.getValuesEncoded(br) {
n := unmarshalUint64(v)
h.update(float64(n))
}
case valueTypeFloat64:
for _, v := range c.getValuesEncoded(br) {
f := unmarshalFloat64(v)
if !math.IsNaN(f) {
h.update(f)
}
}
case valueTypeIPv4:
case valueTypeTimestampISO8601:
default:
logger.Panicf("BUG: unexpected valueType=%d", c.valueType)
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) { func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsQuantileProcessor) src := sfp.(*statsQuantileProcessor)
sqp.h.mergeState(&src.h) sqp.h.mergeState(&src.h)

View file

@ -38,27 +38,13 @@ func (ssp *statsSumProcessor) updateStatsForAllRows(br *blockResult) int {
if ssp.ss.containsStar { if ssp.ss.containsStar {
// Sum all the columns // Sum all the columns
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, count := c.sumValues(br) ssp.updateStateForColumn(br, c)
if count > 0 {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f
}
}
} }
} else { } else {
// Sum the requested columns // Sum the requested columns
for _, field := range ssp.ss.fields { for _, field := range ssp.ss.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, count := c.sumValues(br) ssp.updateStateForColumn(br, c)
if count > 0 {
if math.IsNaN(ssp.sum) {
ssp.sum = f
} else {
ssp.sum += f
}
}
} }
} }
return 0 return 0
@ -68,31 +54,38 @@ func (ssp *statsSumProcessor) updateStatsForRow(br *blockResult, rowIdx int) int
if ssp.ss.containsStar { if ssp.ss.containsStar {
// Sum all the fields for the given row // Sum all the fields for the given row
for _, c := range br.getColumns() { for _, c := range br.getColumns() {
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
if math.IsNaN(ssp.sum) { ssp.updateState(f)
ssp.sum = f
} else {
ssp.sum += f
}
} }
} }
} else { } else {
// Sum only the given fields for the given row // Sum only the given fields for the given row
for _, field := range ssp.ss.fields { for _, field := range ssp.ss.fields {
c := br.getColumnByName(field) c := br.getColumnByName(field)
f, ok := c.getFloatValueAtRow(rowIdx) f, ok := c.getFloatValueAtRow(br, rowIdx)
if ok { if ok {
ssp.updateState(f)
}
}
}
return 0
}
func (ssp *statsSumProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) {
f, count := c.sumValues(br)
if count > 0 {
ssp.updateState(f)
}
}
func (ssp *statsSumProcessor) updateState(f float64) {
if math.IsNaN(ssp.sum) { if math.IsNaN(ssp.sum) {
ssp.sum = f ssp.sum = f
} else { } else {
ssp.sum += f ssp.sum += f
} }
} }
}
}
return 0
}
func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) { func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsSumProcessor) src := sfp.(*statsSumProcessor)

View file

@ -68,7 +68,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForAllRowsColumn(c *blockResultC
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
v := c.encodedValues[0] v := c.valuesEncoded[0]
if v == "" { if v == "" {
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
@ -141,7 +141,7 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
// collect unique const values // collect unique const values
v := c.encodedValues[0] v := c.valuesEncoded[0]
if v == "" { if v == "" {
// skip empty values // skip empty values
return stateSizeIncrease return stateSizeIncrease
@ -155,7 +155,8 @@ func (sup *statsUniqValuesProcessor) updateStatsForRowColumn(c *blockResultColum
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
// collect unique non-zero c.dictValues // collect unique non-zero c.dictValues
dictIdx := c.encodedValues[rowIdx][0] valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
v := c.dictValues[dictIdx] v := c.dictValues[dictIdx]
if v == "" { if v == "" {
// skip empty values // skip empty values

View file

@ -61,7 +61,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int {
func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int { func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
v := strings.Clone(c.encodedValues[0]) v := strings.Clone(c.valuesEncoded[0])
stateSizeIncrease += len(v) stateSizeIncrease += len(v)
values := svp.values values := svp.values
@ -81,7 +81,7 @@ func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColum
} }
values := svp.values values := svp.values
for _, encodedValue := range c.encodedValues { for _, encodedValue := range c.getValuesEncoded(br) {
idx := encodedValue[0] idx := encodedValue[0]
values = append(values, dictValues[idx]) values = append(values, dictValues[idx])
} }
@ -128,7 +128,7 @@ func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int)
func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int { func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0 stateSizeIncrease := 0
if c.isConst { if c.isConst {
v := strings.Clone(c.encodedValues[0]) v := strings.Clone(c.valuesEncoded[0])
stateSizeIncrease += len(v) stateSizeIncrease += len(v)
svp.values = append(svp.values, v) svp.values = append(svp.values, v)
@ -138,7 +138,8 @@ func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, b
} }
if c.valueType == valueTypeDict { if c.valueType == valueTypeDict {
// collect unique non-zero c.dictValues // collect unique non-zero c.dictValues
dictIdx := c.encodedValues[rowIdx][0] valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
v := strings.Clone(c.dictValues[dictIdx]) v := strings.Clone(c.dictValues[dictIdx])
stateSizeIncrease += len(v) stateSizeIncrease += len(v)