diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index ab556a401..5dfcfc986 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -8,6 +8,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fastnum" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) @@ -49,37 +50,29 @@ func (br *blockResult) reset() { br.csOffset = 0 + clear(br.cs) + br.cs = br.cs[:0] +} + +// setResultColumns sets the given rcs as br columns. +// +// The returned result is valid only until rcs are modified. +func (br *blockResult) setResultColumns(rcs []resultColumn) { + br.reset() + if len(rcs) == 0 { + return + } + fastnum.AppendInt64Zeros(br.timestamps[:0], len(rcs[0].values)) + cs := br.cs - for i := range cs { - cs[i].reset() - } - br.cs = cs[:0] -} - -func (br *blockResult) resetRows() { - br.buf = br.buf[:0] - - clear(br.valuesBuf) - br.valuesBuf = br.valuesBuf[:0] - - br.timestamps = br.timestamps[:0] - - cs := br.getColumns() - for i := range cs { - cs[i].resetRows() - } -} - -func (br *blockResult) addRow(timestamp int64, values []string) { - br.timestamps = append(br.timestamps, timestamp) - - cs := br.getColumns() - if len(values) != len(cs) { - logger.Panicf("BUG: unexpected number of values in a row; got %d; want %d", len(values), len(cs)) - } - for i := range cs { - cs[i].addValue(values[i]) + for _, rc := range rcs { + cs = append(cs, blockResultColumn{ + name: rc.name, + valueType: valueTypeString, + encodedValues: rc.values, + }) } + br.cs = cs } func (br *blockResult) fetchAllColumns(bs *blockSearch, bm *bitmap) { @@ -1121,6 +1114,10 @@ func (br *blockResult) truncateRows(keepRows int) { } } +// blockResultColumn represents named column from blockResult. +// +// blockResultColumn doesn't own any referred data - all the referred data must be owned by blockResult. +// This simplifies copying, resetting and re-using of the struct. type blockResultColumn struct { // name is column name. name string @@ -1155,49 +1152,6 @@ type blockResultColumn struct { // bucketOffset contains bucketOffset for bucketedValues bucketOffset float64 - - // buf and valuesBuf are used by addValue() in order to re-use memory across resetRows(). - buf []byte - valuesBuf []string -} - -func (c *blockResultColumn) reset() { - c.name = "" - c.isConst = false - c.isTime = false - c.valueType = valueTypeUnknown - c.dictValues = nil - c.encodedValues = nil - c.values = nil - - c.buf = c.buf[:0] - - clear(c.valuesBuf) - c.valuesBuf = c.valuesBuf[:0] -} - -func (c *blockResultColumn) resetRows() { - c.dictValues = nil - c.encodedValues = nil - c.values = nil - - c.buf = c.buf[:0] - - clear(c.valuesBuf) - c.valuesBuf = c.valuesBuf[:0] -} - -func (c *blockResultColumn) addValue(v string) { - if c.valueType != valueTypeString { - logger.Panicf("BUG: unexpected column type; got %d; want %d", c.valueType, valueTypeString) - } - - bufLen := len(c.buf) - c.buf = append(c.buf, v...) - c.valuesBuf = append(c.valuesBuf, bytesutil.ToUnsafeString(c.buf[bufLen:])) - - c.encodedValues = c.valuesBuf - c.values = c.valuesBuf } // getValueAtRow returns value for the value at the given rowIdx. @@ -1593,4 +1547,31 @@ func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) { } } +// resultColumn represents a column with result values +type resultColumn struct { + // name is column name. + name string + + // buf contains values data. + buf []byte + + // values is the result values. They are backed by data inside buf. + values []string +} + +func (rc *resultColumn) reset() { + rc.name = "" + rc.buf = rc.buf[:0] + + clear(rc.values) + rc.values = rc.values[:0] +} + +// addValue adds the given values v to rc. +func (rc *resultColumn) addValue(v string) { + bufLen := len(rc.buf) + rc.buf = append(rc.buf, v...) + rc.values = append(rc.values, bytesutil.ToUnsafeString(rc.buf[bufLen:])) +} + var nan = math.NaN() diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index e58d569c7..6334469fd 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -345,15 +345,21 @@ func (psp *pipeStatsProcessor) flush() error { m = shards[0].m } - var values []string - var br blockResult + rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.resultNames)) for _, bf := range byFields { - br.addEmptyStringColumn(bf.name) + rcs = append(rcs, resultColumn{ + name: bf.name, + }) } for _, resultName := range psp.ps.resultNames { - br.addEmptyStringColumn(resultName) + rcs = append(rcs, resultColumn{ + name: resultName, + }) } + var br blockResult + var values []string + valuesLen := 0 for key, spg := range m { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. @@ -384,15 +390,25 @@ func (psp *pipeStatsProcessor) flush() error { values = append(values, value) } - br.addRow(0, values) - if len(br.timestamps) >= 1_000 { + if len(values) != len(rcs) { + logger.Panicf("BUG: len(values)=%d must be equal to len(rcs)=%d", len(values), len(rcs)) + } + for i, v := range values { + rcs[i].addValue(v) + valuesLen += len(v) + } + if valuesLen >= 1_000_000 { + br.setResultColumns(rcs) psp.ppBase.writeBlock(0, &br) - br.resetRows() + br.reset() + for i := range rcs { + rcs[i].reset() + } + valuesLen = 0 } } - if len(br.timestamps) > 0 { - psp.ppBase.writeBlock(0, &br) - } + br.setResultColumns(rcs) + psp.ppBase.writeBlock(0, &br) return nil }