This commit is contained in:
Aliaksandr Valialkin 2024-05-19 13:23:27 +02:00
parent 5e20724e1a
commit 5175d99d49
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 71 additions and 46 deletions

View file

@ -208,9 +208,22 @@ func (br *blockResult) sizeBytes() int {
return n
}
// addResultColumns adds the given rcs to br.
//
// The br is valid only until rcs are modified.
func (br *blockResult) addResultColumns(rcs []resultColumn) {
if len(rcs) == 0 || len(rcs[0].values) == 0 {
return
}
for i := range rcs {
br.addResultColumn(&rcs[i])
}
}
// setResultColumns sets the given rcs as br columns.
//
// The returned result is valid only until rcs are modified.
// The br is valid only until rcs are modified.
func (br *blockResult) setResultColumns(rcs []resultColumn) {
br.reset()
@ -220,24 +233,29 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) {
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values))
csBuf := br.csBuf
for _, rc := range rcs {
for i := range rcs {
br.addResultColumn(&rcs[i])
}
}
func (br *blockResult) addResultColumn(rc *resultColumn) {
if len(rc.values) != len(br.timestamps) {
logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values))
}
if areConstValues(rc.values) {
// This optimization allows reducing memory usage after br cloning
csBuf = append(csBuf, blockResultColumn{
name: br.a.copyString(rc.name),
br.csBuf = append(br.csBuf, blockResultColumn{
name: rc.name,
isConst: true,
valuesEncoded: rc.values[:1],
})
} else {
csBuf = append(csBuf, blockResultColumn{
name: br.a.copyString(rc.name),
br.csBuf = append(br.csBuf, blockResultColumn{
name: rc.name,
valueType: valueTypeString,
valuesEncoded: rc.values,
})
}
}
br.csBuf = csBuf
br.csInitialized = false
}
@ -1765,37 +1783,25 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
}
}
// resultColumn represents a column with result values
// resultColumn represents a column with result values.
//
// It doesn't own the result values.
type resultColumn struct {
// name is column name.
name string
// a contains values data.
a arena
// values is the result values. They are backed by data inside a.
// values is the result values.
values []string
}
func (rc *resultColumn) sizeBytes() int {
return len(rc.name) + rc.a.sizeBytes() + len(rc.values)*int(unsafe.Sizeof(rc.values[0]))
}
func (rc *resultColumn) resetKeepName() {
rc.a.reset()
clear(rc.values)
rc.values = rc.values[:0]
}
// addValue adds the given values v to rc.
func (rc *resultColumn) addValue(v string) {
values := rc.values
if len(values) > 0 && string(v) == values[len(values)-1] {
v = values[len(values)-1]
} else {
v = rc.a.copyString(v)
}
rc.values = append(values, v)
rc.values = append(rc.values, v)
}
func truncateTimestampToMonth(timestamp int64) int64 {

View file

@ -37,9 +37,15 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
func (pe *pipeExtract) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeExtractProcessorShard, workersCount)
for i := range shards {
ef := newExtractFormat(pe.steps)
rcs := make([]resultColumn, len(ef.fields))
for j := range rcs {
rcs[j].name = ef.fields[j].name
}
shards[i] = pipeExtractProcessorShard{
pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{
ef: newExtractFormat(pe.steps),
ef: ef,
rcs: rcs,
},
}
}
@ -71,6 +77,8 @@ type pipeExtractProcessorShard struct {
type pipeExtractProcessorShardNopad struct {
ef *extractFormat
rcs []resultColumn
}
func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
@ -83,12 +91,18 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
values := c.getValues(br)
ef := shard.ef
rcs := shard.rcs
for _, v := range values {
ef.apply(v)
/* for i, result := range ef.results {
rcs[i].addValue(result)
for i, f := range ef.fields {
rcs[i].addValue(*f.value)
}
*/
}
br.addResultColumns(rcs)
pep.ppBase.writeBlock(workerID, br)
for i := range rcs {
rcs[i].resetKeepName()
}
}
@ -162,7 +176,7 @@ func (ef *extractFormat) apply(s string) {
matches := ef.matches
for i := range steps {
nextPrefix := ""
if i + 1 < len(steps) {
if i+1 < len(steps) {
nextPrefix = steps[i+1].prefix
}

View file

@ -11,7 +11,6 @@ import (
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
)
@ -211,7 +210,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
shard.columnValues = columnValues
// Generate byColumns
var rc resultColumn
valuesEncoded := make([]string, len(br.timestamps))
shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0]))
bb := bbPool.Get()
for rowIdx := range br.timestamps {
@ -223,7 +223,12 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v)
bb.B = append(bb.B, ',')
}
rc.addValue(bytesutil.ToUnsafeString(bb.B))
if rowIdx > 0 && valuesEncoded[rowIdx-1] == string(bb.B) {
valuesEncoded[rowIdx] = valuesEncoded[rowIdx-1]
} else {
valuesEncoded[rowIdx] = string(bb.B)
shard.stateSizeBudget -= len(bb.B)
}
}
bbPool.Put(bb)
@ -236,13 +241,13 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) {
{
c: &blockResultColumn{
valueType: valueTypeString,
valuesEncoded: rc.values,
valuesEncoded: valuesEncoded,
},
i64Values: i64Values,
f64Values: f64Values,
},
}
shard.stateSizeBudget -= rc.sizeBytes() + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c))
shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]) + unsafe.Sizeof(*byColumns[0].c))
// Append br to shard.blocks.
shard.blocks = append(shard.blocks, sortBlock{