From b1df5ce183130b1faf27994d75cae96b0e8463f8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 7 May 2024 16:47:47 +0200 Subject: [PATCH] wip --- lib/logstorage/block_result.go | 134 +++++++++++++++++-- lib/logstorage/pipe_sort.go | 236 ++++++++++++--------------------- 2 files changed, 209 insertions(+), 161 deletions(-) diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index ce6ac5d0d..656cac8f1 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "math" "slices" + "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" @@ -54,6 +55,68 @@ func (br *blockResult) reset() { br.cs = br.cs[:0] } +// clone returns a clone of br, which owns its own memory. +func (br *blockResult) clone() *blockResult { + brNew := &blockResult{} + + cs := br.getColumns() + + bufLen := 0 + for _, c := range cs { + bufLen += c.neededBackingBufLen() + } + brNew.buf = make([]byte, 0, bufLen) + + valuesBufLen := 0 + for _, c := range cs { + valuesBufLen += c.neededBackingValuesBufLen() + } + brNew.valuesBuf = make([]string, 0, valuesBufLen) + + brNew.streamID = br.streamID + + brNew.timestamps = make([]int64, len(br.timestamps)) + copy(brNew.timestamps, br.timestamps) + + csNew := make([]blockResultColumn, len(cs)) + for i, c := range cs { + csNew[i] = c.clone(brNew) + } + brNew.cs = csNew + + return brNew +} + +// cloneValues clones the given values into br and returns the cloned values. +func (br *blockResult) cloneValues(values []string) []string { + buf := br.buf + valuesBuf := br.valuesBuf + valuesBufLen := len(valuesBuf) + + for _, v := range values { + bufLen := len(buf) + buf = append(buf, v...) + valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) + } + + br.valuesBuf = valuesBuf + br.buf = buf + + return valuesBuf[valuesBufLen:] +} + +// sizeBytes returns the size of br in bytes. +func (br *blockResult) sizeBytes() int { + n := int(unsafe.Sizeof(*br)) + + n += cap(br.buf) + n += cap(br.valuesBuf) * int(unsafe.Sizeof(br.valuesBuf[0])) + n += cap(br.timestamps) * int(unsafe.Sizeof(br.timestamps[0])) + n += cap(br.cs) * int(unsafe.Sizeof(br.cs[0])) + + return n +} + // setResultColumns sets the given rcs as br columns. // // The returned result is valid only until rcs are modified. @@ -67,11 +130,20 @@ func (br *blockResult) setResultColumns(rcs []resultColumn) { cs := br.cs for _, rc := range rcs { - cs = append(cs, blockResultColumn{ - name: rc.name, - valueType: valueTypeString, - encodedValues: rc.values, - }) + if areConstValues(rc.values) { + // This optimization allows reducing memory usage after br cloning + cs = append(cs, blockResultColumn{ + name: rc.name, + isConst: true, + encodedValues: rc.values[:1], + }) + } else { + cs = append(cs, blockResultColumn{ + name: rc.name, + valueType: valueTypeString, + encodedValues: rc.values, + }) + } } br.cs = cs } @@ -940,13 +1012,6 @@ func (br *blockResult) getBucketedValue(s string, bucketSize, bucketOffset float return s } -func (br *blockResult) addEmptyStringColumn(columnName string) { - br.cs = append(br.cs, blockResultColumn{ - name: columnName, - valueType: valueTypeString, - }) -} - // copyColumns copies columns from srcColumnNames to dstColumnNames. func (br *blockResult) copyColumns(srcColumnNames, dstColumnNames []string) { if len(srcColumnNames) == 0 { @@ -1155,6 +1220,51 @@ type blockResultColumn struct { bucketOffset float64 } +// clone returns a clone of c backed by data from br. +func (c *blockResultColumn) clone(br *blockResult) blockResultColumn { + var cNew blockResultColumn + + cNew.name = c.name + cNew.isConst = c.isConst + cNew.isTime = c.isTime + cNew.valueType = c.valueType + cNew.dictValues = br.cloneValues(c.dictValues) + cNew.encodedValues = br.cloneValues(c.encodedValues) + // do not copy c.values and c.bucketedValues - they should be re-created from scrach if needed + cNew.bucketSize = c.bucketSize + cNew.bucketOffset = c.bucketOffset + + return cNew +} + +func (c *blockResultColumn) neededBackingBufLen() int { + n := 0 + + n += valuesSizeBytes(c.dictValues) + n += valuesSizeBytes(c.encodedValues) + // do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed + + return n +} + +func (c *blockResultColumn) neededBackingValuesBufLen() int { + n := 0 + + n += len(c.dictValues) + n += len(c.encodedValues) + // do not take into account c.values and c.bucketedValues, since they should be re-created from scratch if needed + + return n +} + +func valuesSizeBytes(values []string) int { + n := 0 + for _, v := range values { + n += len(v) + } + return n +} + // getValueAtRow returns value for the value at the given rowIdx. // // The returned value is valid until br.reset() is called. diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 6018ec0a7..607a3d91d 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "unsafe" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) @@ -49,7 +48,7 @@ func (ps *pipeSort) getNeededFields() ([]string, map[string][]string) { } func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { - maxStateSize := int64(float64(memory.Allowed()) * 0.1) + maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeSortProcessorShard, workersCount) for i := range shards { @@ -97,40 +96,22 @@ type pipeSortProcessorShardNopad struct { // ps point to the parent pipeSort. ps *pipeSort - // buf holds all the logs data written to the given shard. - buf []byte - - // valuesBuf holds all the string values written to the given shard - // The actual strings are stored in buf. - valuesBuf []string - - // u64ValuesBuf holds uint64 values parsed from valuesBuf for speeding up the sorting. + // u64ValuesBuf holds uint64 values parsed from values for speeding up the sorting. u64ValuesBuf []uint64 - // f64ValuesBuf holds float64 values parsed from valuesBuf for speeding up the sorting. + // f64ValuesBuf holds float64 values parsed from values for speeding up the sorting. f64ValuesBuf []float64 - // timestampsBuf holds timestamps if _time columns are used for sorting. - // This speeds up sorting by _time. - timestampsBuf []int64 - - // byColumnsBuf holds `by(...)` columns written to the shard. - byColumnsBuf []sortBlockByColumn - - // otherColumnsBuf holds other than `by(...)` columns written to the shard. - otherColumnsBuf []sortBlockOtherColumn - // blocks holds all the blocks with logs written to the shard. blocks []sortBlock // rowRefs holds references to all the rows stored in blocks. // - // Sorting sorts rowRefs, while blocks remain unchanged. - // This should speed up sorting. + // Sorting sorts rowRefs, while blocks remain unchanged. This should speed up sorting. rowRefs []sortRowRef // rowRefNext points to the next index at rowRefs during merge shards phase - rowRefNext uint + rowRefNext int // stateSizeBudget is the remaining budget for the whole state size for the shard. // The per-shard budget is provided in chunks from the parent pipeSortProcessor. @@ -138,81 +119,88 @@ type pipeSortProcessorShardNopad struct { } // sortBlock represents a block of logs for sorting. -// -// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. type sortBlock struct { + // br is a result block to sort + br *blockResult + // byColumns refers block data for 'by(...)' columns byColumns []sortBlockByColumn // otherColumns refers block data for other than 'by(...)' columns - otherColumns []sortBlockOtherColumn + otherColumns []blockResultColumn } // sortBlockByColumn represents data for a single column from 'sort by(...)' clause. -// -// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. type sortBlockByColumn struct { - // values contains column values - values []string + // c contains column data + c blockResultColumn - // u64Values contains uint6464 numbers parsed from values + // u64Values contains uint64 numbers parsed from values u64Values []uint64 // f64Values contains float64 numbers parsed from values f64Values []float64 - - // timestamps contains timestamps for blockResultColumn.isTime column - timestamps []int64 -} - -// sortBlockOtherColumn represents data for a single column outside 'sort by(...)' clause. -// -// It doesn't own the data it refers - all the data belongs to pipeSortProcessorShard. -type sortBlockOtherColumn struct { - // name is the column name - name string - - // values contains column values - values []string } // sortRowRef is the reference to a single log entry written to `sort` pipe. type sortRowRef struct { // blockIdx is the index of the block at pipeSortProcessorShard.blocks. - blockIdx uint + blockIdx int // rowIdx is the index of the log entry inside the block referenced by blockIdx. - rowIdx uint + rowIdx int +} + +func (c *sortBlockByColumn) getU64ValueAtRow(rowIdx int) uint64 { + if c.c.isConst { + return c.u64Values[0] + } + return c.u64Values[rowIdx] +} + +func (c *sortBlockByColumn) getF64ValueAtRow(rowIdx int) float64 { + if c.c.isConst { + return c.f64Values[0] + } + return c.f64Values[rowIdx] } // writeBlock writes br with the given byFields to shard. func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { + // clone br, so it could be owned by shard + br = br.clone() + byFields := shard.ps.byFields - cs := br.getColumns() // Collect values for columns from byFields. - byColumnsBuf := shard.byColumnsBuf - byColumnsBufLen := len(byColumnsBuf) - for _, bf := range byFields { + byColumns := make([]sortBlockByColumn, len(byFields)) + for i, bf := range byFields { c := br.getColumnByName(bf.name) - byColumnsBuf = append(byColumnsBuf, sortBlockByColumn{}) - bc := &byColumnsBuf[len(byColumnsBuf)-1] + bc := &byColumns[i] + bc.c = c + if c.isTime { - bc.timestamps = shard.copyTimestamps(br.timestamps) - } else { - values := c.getValues(br) - bc.values = shard.copyValues(values) - bc.u64Values = shard.createUint64Values(values) - bc.f64Values = shard.createFloat64Values(values) + // Do not initialize bc.values, bc.u64Values and bc.f64Values, since they aren't used. + // This saves some memory. + continue } + if c.isConst { + // Do not initialize bc.values in order to save some memory. + bc.u64Values = shard.createUint64Values(c.encodedValues) + bc.f64Values = shard.createFloat64Values(c.encodedValues) + continue + } + + // pre-populate values in order to track better br memory usage + values := c.getValues(br) + bc.u64Values = shard.createUint64Values(values) + bc.f64Values = shard.createFloat64Values(values) } - shard.byColumnsBuf = byColumnsBuf - byColumns := byColumnsBuf[byColumnsBufLen:] shard.stateSizeBudget -= len(byColumns) * int(unsafe.Sizeof(byColumns[0])) // Collect values for other columns. - otherColumnsBuf := shard.otherColumnsBuf - otherColumnsBufLen := len(otherColumnsBuf) + cs := br.getColumns() + otherColumns := make([]blockResultColumn, 0, len(cs)) for _, c := range cs { isByField := false for _, bf := range byFields { @@ -221,65 +209,35 @@ func (shard *pipeSortProcessorShard) writeBlock(br *blockResult) { break } } - if isByField { - continue + if !isByField { + otherColumns = append(otherColumns, c) } - - values := c.getValues(br) - values = shard.copyValues(values) - otherColumnsBuf = append(otherColumnsBuf, sortBlockOtherColumn{ - name: c.name, - values: values, - }) } - shard.otherColumnsBuf = otherColumnsBuf - otherColumns := otherColumnsBuf[otherColumnsBufLen:] shard.stateSizeBudget -= len(otherColumns) * int(unsafe.Sizeof(otherColumns[0])) // Add row references to rowRefs. - blockIdx := uint(len(shard.blocks)) + blockIdx := len(shard.blocks) rowRefs := shard.rowRefs rowRefsLen := len(rowRefs) for i := range br.timestamps { rowRefs = append(rowRefs, sortRowRef{ blockIdx: blockIdx, - rowIdx: uint(i), + rowIdx: i, }) } shard.rowRefs = rowRefs shard.stateSizeBudget -= (len(rowRefs) - rowRefsLen) * int(unsafe.Sizeof(rowRefs[0])) - // Add byColumns and otherColumns to blocks. + // Append br to shard.blocks. shard.blocks = append(shard.blocks, sortBlock{ + br: br, byColumns: byColumns, otherColumns: otherColumns, }) + shard.stateSizeBudget -= br.sizeBytes() shard.stateSizeBudget -= int(unsafe.Sizeof(shard.blocks[0])) } -// copyValues copies values to the shard and returns the copied values. -func (shard *pipeSortProcessorShard) copyValues(values []string) []string { - buf := shard.buf - bufLenOriginal := len(buf) - - valuesBuf := shard.valuesBuf - valuesBufLen := len(valuesBuf) - - for _, v := range values { - bufLen := len(buf) - buf = append(buf, v...) - valuesBuf = append(valuesBuf, bytesutil.ToUnsafeString(buf[bufLen:])) - } - - shard.valuesBuf = valuesBuf - shard.buf = buf - - shard.stateSizeBudget -= len(buf) - bufLenOriginal - shard.stateSizeBudget -= (len(valuesBuf) - valuesBufLen) * int(unsafe.Sizeof(valuesBuf[0])) - - return valuesBuf[valuesBufLen:] -} - func (shard *pipeSortProcessorShard) createUint64Values(values []string) []uint64 { u64ValuesBuf := shard.u64ValuesBuf u64ValuesBufLen := len(u64ValuesBuf) @@ -318,17 +276,6 @@ func (shard *pipeSortProcessorShard) createFloat64Values(values []string) []floa return f64ValuesBuf[f64ValuesBufLen:] } -func (shard *pipeSortProcessorShard) copyTimestamps(timestamps []int64) []int64 { - timestampsBuf := shard.timestampsBuf - timestampsBufLen := len(shard.timestampsBuf) - timestampsBuf = append(timestampsBuf, timestamps...) - shard.timestampsBuf = timestampsBuf - - shard.stateSizeBudget -= (len(timestampsBuf) - timestampsBufLen) * int(unsafe.Sizeof(timestampsBuf[0])) - - return shard.timestampsBuf[timestampsBufLen:] -} - func (psp *pipeSortProcessorShard) Len() int { return len(psp.rowRefs) } @@ -339,7 +286,7 @@ func (psp *pipeSortProcessorShard) Swap(i, j int) { } func (psp *pipeSortProcessorShard) Less(i, j int) bool { - return sortBlockLess(psp, uint(i), psp, uint(j)) + return sortBlockLess(psp, i, psp, j) } func (psp *pipeSortProcessor) writeBlock(workerID uint, br *blockResult) { @@ -414,12 +361,13 @@ func (psp *pipeSortProcessor) flush() error { psp: psp, } var shardNext *pipeSortProcessorShard + for len(sh) > 1 { shard := sh[0] wctx.writeRow(shard, shard.rowRefNext) shard.rowRefNext++ - if shard.rowRefNext >= uint(len(shard.rowRefs)) { + if shard.rowRefNext >= len(shard.rowRefs) { _ = heap.Pop(&sh) shardNext = nil @@ -452,7 +400,7 @@ func (psp *pipeSortProcessor) flush() error { } if len(sh) == 1 { shard := sh[0] - for shard.rowRefNext < uint(len(shard.rowRefs)) { + for shard.rowRefNext < len(shard.rowRefs) { wctx.writeRow(shard, shard.rowRefNext) shard.rowRefNext++ } @@ -467,12 +415,10 @@ type pipeSortWriteContext struct { rcs []resultColumn br blockResult - auxBuf []byte - valuesLen int } -func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx uint) { +func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx int) { rr := shard.rowRefs[rowIdx] b := &shard.blocks[rr.blockIdx] @@ -506,25 +452,17 @@ func (wctx *pipeSortWriteContext) writeRow(shard *pipeSortProcessorShard, rowIdx wctx.rcs = rcs } + br := b.br byColumns := b.byColumns - auxBuf := wctx.auxBuf - for i := range byFields { - bc := &byColumns[i] - if len(bc.timestamps) > 0 { - auxBuf = marshalTimestampISO8601(auxBuf[:0], bc.timestamps[rr.rowIdx]) - rcs[i].addValue(bytesutil.ToUnsafeString(auxBuf)) - wctx.valuesLen += len(auxBuf) - } else { - v := bc.values[rr.rowIdx] - rcs[i].addValue(v) - wctx.valuesLen += len(v) - } + for i := range byColumns { + v := byColumns[i].c.getValueAtRow(br, rr.rowIdx) + rcs[i].addValue(v) + wctx.valuesLen += len(v) } - wctx.auxBuf = auxBuf otherColumns := b.otherColumns for i := range otherColumns { - v := otherColumns[i].values[rr.rowIdx] + v := otherColumns[i].getValueAtRow(br, rr.rowIdx) rcs[len(byFields)+i].addValue(v) wctx.valuesLen += len(v) } @@ -584,22 +522,22 @@ func (sh *pipeSortProcessorShardsHeap) Pop() any { return x } -func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSortProcessorShard, rowIdxB uint) bool { +func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSortProcessorShard, rowIdxB int) bool { byFields := shardA.ps.byFields rrA := shardA.rowRefs[rowIdxA] rrB := shardB.rowRefs[rowIdxB] - csA := shardA.blocks[rrA.blockIdx].byColumns - csB := shardB.blocks[rrB.blockIdx].byColumns - for idx := range csA { - cA := &csA[idx] - cB := &csB[idx] + bA := &shardA.blocks[rrA.blockIdx] + bB := &shardB.blocks[rrB.blockIdx] + for idx := range bA.byColumns { + cA := &bA.byColumns[idx] + cB := &bB.byColumns[idx] bf := byFields[idx] - if len(cA.timestamps) > 0 && len(cB.timestamps) > 0 { + if cA.c.isTime && cB.c.isTime { // Fast path - sort by _time - tA := cA.timestamps[rrA.rowIdx] - tB := cB.timestamps[rrB.rowIdx] + tA := bA.br.timestamps[rrA.rowIdx] + tB := bB.br.timestamps[rrB.rowIdx] if tA == tB { continue } @@ -608,18 +546,18 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } return tA < tB } - if len(cA.timestamps) > 0 { + if cA.c.isTime { // treat timestamps as smaller than other values return true } - if len(cB.timestamps) > 0 { + if cB.c.isTime { // treat timestamps as smaller than other values return false } // Try sorting by uint64 values at first - uA := cA.u64Values[rrA.rowIdx] - uB := cB.u64Values[rrB.rowIdx] + uA := cA.getU64ValueAtRow(rrA.rowIdx) + uB := cB.getU64ValueAtRow(rrB.rowIdx) if uA != 0 && uB != 0 { if uA == uB { continue @@ -631,8 +569,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } // Try sorting by float64 then - fA := cA.f64Values[rrA.rowIdx] - fB := cB.f64Values[rrB.rowIdx] + fA := cA.getF64ValueAtRow(rrA.rowIdx) + fB := cB.getF64ValueAtRow(rrB.rowIdx) if !math.IsNaN(fA) && !math.IsNaN(fB) { if fA == fB { continue @@ -644,8 +582,8 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA uint, shardB *pipeSor } // Fall back to string sorting - sA := cA.values[rrA.rowIdx] - sB := cB.values[rrB.rowIdx] + sA := cA.c.getValueAtRow(bA.br, rrA.rowIdx) + sB := cB.c.getValueAtRow(bB.br, rrB.rowIdx) if sA == sB { continue }