This commit is contained in:
Aliaksandr Valialkin 2024-05-06 01:27:05 +02:00
parent 67f0b887fa
commit 812950013b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
2 changed files with 79 additions and 82 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
@ -49,37 +50,29 @@ func (br *blockResult) reset() {
br.csOffset = 0
clear(br.cs)
br.cs = br.cs[:0]
}
// setResultColumns sets the given rcs as br columns.
//
// The returned result is valid only until rcs are modified.
func (br *blockResult) setResultColumns(rcs []resultColumn) {
br.reset()
if len(rcs) == 0 {
return
}
fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values))
cs := br.cs
for i := range cs {
cs[i].reset()
}
br.cs = cs[:0]
}
func (br *blockResult) resetRows() {
br.buf = br.buf[:0]
clear(br.valuesBuf)
br.valuesBuf = br.valuesBuf[:0]
br.timestamps = br.timestamps[:0]
cs := br.getColumns()
for i := range cs {
cs[i].resetRows()
}
}
func (br *blockResult) addRow(timestamp int64, values []string) {
br.timestamps = append(br.timestamps, timestamp)
cs := br.getColumns()
if len(values) != len(cs) {
logger.Panicf("BUG: unexpected number of values in a row; got %d; want %d", len(values), len(cs))
}
for i := range cs {
cs[i].addValue(values[i])
for _, rc := range rcs {
cs = append(cs, blockResultColumn{
name: rc.name,
valueType: valueTypeString,
encodedValues: rc.values,
})
}
br.cs = cs
}
func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) {
@ -1121,6 +1114,10 @@ func (br *blockResult) truncateRows(keepRows int) {
}
}
// blockResultColumn represents named column from blockResult.
//
// blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult.
// This simplifies copying, resetting and re-using of the struct.
type blockResultColumn struct {
// name is column name.
name string
@ -1155,49 +1152,6 @@ type blockResultColumn struct {
// bucketOffset contains bucketOffset for bucketedValues
bucketOffset float64
// buf and valuesBuf are used by addValue() in order to re-use memory across resetRows().
buf []byte
valuesBuf []string
}
func (c *blockResultColumn) reset() {
c.name = ""
c.isConst = false
c.isTime = false
c.valueType = valueTypeUnknown
c.dictValues = nil
c.encodedValues = nil
c.values = nil
c.buf = c.buf[:0]
clear(c.valuesBuf)
c.valuesBuf = c.valuesBuf[:0]
}
func (c *blockResultColumn) resetRows() {
c.dictValues = nil
c.encodedValues = nil
c.values = nil
c.buf = c.buf[:0]
clear(c.valuesBuf)
c.valuesBuf = c.valuesBuf[:0]
}
func (c *blockResultColumn) addValue(v string) {
if c.valueType != valueTypeString {
logger.Panicf("BUG: unexpected column type; got %d; want %d", c.valueType, valueTypeString)
}
bufLen := len(c.buf)
c.buf = append(c.buf, v...)
c.valuesBuf = append(c.valuesBuf, bytesutil.ToUnsafeString(c.buf[bufLen:]))
c.encodedValues = c.valuesBuf
c.values = c.valuesBuf
}
// getValueAtRow returns value for the value at the given rowIdx.
@ -1593,4 +1547,31 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
}
}
// resultColumn represents a column with result values
type resultColumn struct {
// name is column name.
name string
// buf contains values data.
buf []byte
// values is the result values. They are backed by data inside buf.
values []string
}
func (rc *resultColumn) reset() {
rc.name = ""
rc.buf = rc.buf[:0]
clear(rc.values)
rc.values = rc.values[:0]
}
// addValue adds the given values v to rc.
func (rc *resultColumn) addValue(v string) {
bufLen := len(rc.buf)
rc.buf = append(rc.buf, v...)
rc.values = append(rc.values, bytesutil.ToUnsafeString(rc.buf[bufLen:]))
}
var nan = math.NaN()

View file

@ -345,15 +345,21 @@ func (psp *pipeStatsProcessor) flush() error {
m = shards[0].m
}
var values []string
var br blockResult
rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.resultNames))
for _, bf := range byFields {
br.addEmptyStringColumn(bf.name)
rcs = append(rcs, resultColumn{
name: bf.name,
})
}
for _, resultName := range psp.ps.resultNames {
br.addEmptyStringColumn(resultName)
rcs = append(rcs, resultColumn{
name: resultName,
})
}
var br blockResult
var values []string
valuesLen := 0
for key, spg := range m {
// m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time.
@ -384,15 +390,25 @@ func (psp *pipeStatsProcessor) flush() error {
values = append(values, value)
}
br.addRow(0, values)
if len(br.timestamps) >= 1_000 {
if len(values) != len(rcs) {
logger.Panicf("BUG: len(values)=%d must be equal to len(rcs)=%d", len(values), len(rcs))
}
for i, v := range values {
rcs[i].addValue(v)
valuesLen += len(v)
}
if valuesLen >= 1_000_000 {
br.setResultColumns(rcs)
psp.ppBase.writeBlock(0, &br)
br.resetRows()
br.reset()
for i := range rcs {
rcs[i].reset()
}
valuesLen = 0
}
}
if len(br.timestamps) > 0 {
psp.ppBase.writeBlock(0, &br)
}
br.setResultColumns(rcs)
psp.ppBase.writeBlock(0, &br)
return nil
}