diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index b3890ea0f..06ac2ee64 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -208,9 +208,22 @@ func (br *blockResult) sizeBytes() int { return n } +// addResultColumns adds the given rcs to br. +// +// The br is valid only until rcs are modified. +func (br *blockResult) addResultColumns(rcs []resultColumn) { + if len(rcs) == 0 || len(rcs[0].values) == 0 { + return + } + + for i := range rcs { + br.addResultColumn(&rcs[i]) + } +} + // setResultColumns sets the given rcs as br columns. // -// The returned result is valid only until rcs are modified. +// The br is valid only until rcs are modified. func (br *blockResult) setResultColumns(rcs []resultColumn) { br.reset() @@ -220,24 +233,29 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) - csBuf := br.csBuf - for _, rc := range rcs { - if areConstValues(rc.values) { - // This optimization allows reducing memory usage after br cloning - csBuf = append(csBuf, blockResultColumn{ - name: br.a.copyString(rc.name), - isConst: true, - valuesEncoded: rc.values[:1], - }) - } else { - csBuf = append(csBuf, blockResultColumn{ - name: br.a.copyString(rc.name), - valueType: valueTypeString, - valuesEncoded: rc.values, - }) - } + for i := range rcs { + br.addResultColumn(&rcs[i]) + } +} + +func (br *blockResult) addResultColumn(rc *resultColumn) { + if len(rc.values) != len(br.timestamps) { + logger.Panicf("BUG: column %q must contain %d rows, but it contains %d rows", rc.name, len(br.timestamps), len(rc.values)) + } + if areConstValues(rc.values) { + // This optimization allows reducing memory usage after br cloning + br.csBuf = append(br.csBuf, blockResultColumn{ + name: rc.name, + isConst: true, + valuesEncoded: rc.values[:1], + }) + } else { + br.csBuf = append(br.csBuf, blockResultColumn{ + name: rc.name, + valueType: valueTypeString, + valuesEncoded: rc.values, + }) } - br.csBuf = csBuf br.csInitialized = false } @@ -1765,37 +1783,25 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { } } -// resultColumn represents a column with result values +// resultColumn represents a column with result values. +// +// It doesn't own the result values. type resultColumn struct { // name is column name. name string - // a contains values data. - a arena - - // values is the result values. They are backed by data inside a. + // values is the result values. values []string } -func (rc *resultColumn) sizeBytes() int { - return len(rc.name) + rc.a.sizeBytes() + len(rc.values)*int(unsafe.Sizeof(rc.values[0])) -} - func (rc *resultColumn) resetKeepName() { - rc.a.reset() clear(rc.values) rc.values = rc.values[:0] } // addValue adds the given values v to rc. func (rc *resultColumn) addValue(v string) { - values := rc.values - if len(values) > 0 && string(v) == values[len(values)-1] { - v = values[len(values)-1] - } else { - v = rc.a.copyString(v) - } - rc.values = append(values, v) + rc.values = append(rc.values, v) } func truncateTimestampToMonth(timestamp int64) int64 { diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 1df8c8d41..991003b9c 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -37,9 +37,15 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet func (pe *pipeExtract) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeExtractProcessorShard, workersCount) for i := range shards { + ef := newExtractFormat(pe.steps) + rcs := make([]resultColumn, len(ef.fields)) + for j := range rcs { + rcs[j].name = ef.fields[j].name + } shards[i] = pipeExtractProcessorShard{ pipeExtractProcessorShardNopad: pipeExtractProcessorShardNopad{ - ef: newExtractFormat(pe.steps), + ef: ef, + rcs: rcs, }, } } @@ -71,6 +77,8 @@ type pipeExtractProcessorShard struct { type pipeExtractProcessorShardNopad struct { ef *extractFormat + + rcs []resultColumn } func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { @@ -83,12 +91,18 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { values := c.getValues(br) ef := shard.ef + rcs := shard.rcs for _, v := range values { ef.apply(v) - /* for i, result := range ef.results { - rcs[i].addValue(result) - } - */ + for i, f := range ef.fields { + rcs[i].addValue(*f.value) + } + } + br.addResultColumns(rcs) + pep.ppBase.writeBlock(workerID, br) + + for i := range rcs { + rcs[i].resetKeepName() } } @@ -162,7 +176,7 @@ func (ef *extractFormat) apply(s string) { matches := ef.matches for i := range steps { nextPrefix := "" - if i + 1 < len(steps) { + if i+1 < len(steps) { nextPrefix = steps[i+1].prefix } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index d5cbbf277..d92b1924a 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) @@ -211,7 +210,8 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { shard.columnValues = columnValues // Generate byColumns - var rc resultColumn + valuesEncoded := make([]string, len(br.timestamps)) + shard.stateSizeBudget -= len(valuesEncoded) * int(unsafe.Sizeof(valuesEncoded[0])) bb := bbPool.Get() for rowIdx := range br.timestamps { @@ -223,7 +223,12 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { bb.B = marshalJSONKeyValue(bb.B, cs[i].name, v) bb.B = append(bb.B, ',') } - rc.addValue(bytesutil.ToUnsafeString(bb.B)) + if rowIdx > 0 && valuesEncoded[rowIdx-1] == string(bb.B) { + valuesEncoded[rowIdx] = valuesEncoded[rowIdx-1] + } else { + valuesEncoded[rowIdx] = string(bb.B) + shard.stateSizeBudget -= len(bb.B) + } } bbPool.Put(bb) @@ -236,13 +241,13 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { { c: &blockResultColumn{ valueType: valueTypeString, - valuesEncoded: rc.values, + valuesEncoded: valuesEncoded, }, i64Values: i64Values, f64Values: f64Values, }, } - shard.stateSizeBudget -= rc.sizeBytes() + int(unsafe.Sizeof(byColumns[0])+unsafe.Sizeof(*byColumns[0].c)) + shard.stateSizeBudget -= int(unsafe.Sizeof(byColumns[0]) + unsafe.Sizeof(*byColumns[0].c)) // Append br to shard.blocks. shard.blocks = append(shard.blocks, sortBlock{