diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 03e6eecf6..629519174 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -11,6 +11,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) type pipe interface { @@ -197,7 +198,9 @@ type statsFunc interface { neededFields() []string // newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc. - newStatsFuncProcessor() statsFuncProcessor + // + // It also must return the size in bytes of the returned statsFuncProcessor. + newStatsFuncProcessor() (statsFuncProcessor, int) } // statsFuncProcessor must process stats for some statsFunc. @@ -206,10 +209,14 @@ type statsFunc interface { // so there is no need in the internal synchronization. type statsFuncProcessor interface { // updateStatsForAllRows must update statsFuncProcessor stats from all the rows. - updateStatsForAllRows(timestamps []int64, columns []BlockColumn) + // + // It must return the increase of internal state size in bytes for the statsFuncProcessor. + updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int // updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex. - updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) + // + // It must return the increase of internal state size in bytes for the statsFuncProcessor. + updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int // mergeState must merge sfp state into statsFuncProcessor state. mergeState(sfp statsFuncProcessor) @@ -235,22 +242,33 @@ func (sp *statsPipe) String() string { return s } +const stateSizeBudgetChunk = 1 << 20 + func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + maxStateSize := int64(float64(memory.Allowed()) * 0.3) + shards := make([]statsPipeProcessorShard, workersCount) for i := range shards { shard := &shards[i] shard.m = make(map[string]*statsPipeGroup) shard.funcs = sp.funcs + shard.stateSizeBudget = stateSizeBudgetChunk + maxStateSize -= stateSizeBudgetChunk } - return &statsPipeProcessor{ + spp := &statsPipeProcessor{ sp: sp, stopCh: stopCh, cancel: cancel, ppBase: ppBase, shards: shards, + + maxStateSize: maxStateSize, } + spp.stateSizeBudget.Store(maxStateSize) + + return spp } type statsPipeProcessor struct { @@ -260,6 +278,9 @@ type statsPipeProcessor struct { ppBase pipeProcessor shards []statsPipeProcessorShard + + maxStateSize int64 + stateSizeBudget atomic.Int64 } type statsPipeProcessorShard struct { @@ -275,6 +296,8 @@ type statsPipeProcessorShardNopad struct { columnIdxs []int keyBuf []byte + + stateSizeBudget int } func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { @@ -284,12 +307,15 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr } sfps := make([]statsFuncProcessor, len(shard.funcs)) for i, f := range shard.funcs { - sfps[i] = f.newStatsFuncProcessor() + sfp, stateSize := f.newStatsFuncProcessor() + sfps[i] = sfp + shard.stateSizeBudget -= stateSize } spg = &statsPipeGroup{ sfps: sfps, } shard.m[string(key)] = spg + shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) return spg } @@ -300,11 +326,25 @@ type statsPipeGroup struct { func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { shard := &spp.shards[workerID] + for shard.stateSizeBudget < 0 { + // steal some budget for the state size from the global budget. + remaining := spp.stateSizeBudget.Add(-stateSizeBudgetChunk) + if remaining < 0 { + // The state size is too big. Stop processing data in order to avoid OOM crash. + if remaining+stateSizeBudgetChunk >= 0 { + // Notify worker goroutines to stop calling writeBlock() in order to save CPU time. + spp.cancel() + } + return + } + shard.stateSizeBudget += stateSizeBudgetChunk + } + if len(spp.sp.byFields) == 0 { // Fast path - pass all the rows to a single group spg := shard.getStatsPipeGroup(nil) for _, sfp := range spg.sfps { - sfp.updateStatsForAllRows(timestamps, columns) + shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } return } @@ -329,13 +369,17 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col spg := shard.getStatsPipeGroup(keyBuf) for _, sfp := range spg.sfps { - sfp.updateStatsForRow(timestamps, columns, i) + shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) } } shard.keyBuf = keyBuf } func (spp *statsPipeProcessor) flush() error { + if n := spp.stateSizeBudget.Load(); n <= 0 { + return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20)) + } + // Merge states across shards shards := spp.shards m := shards[0].m @@ -506,10 +550,11 @@ func (sfc *statsFuncCount) neededFields() []string { return getFieldsIgnoreStar(sfc.fields) } -func (sfc *statsFuncCount) newStatsFuncProcessor() statsFuncProcessor { - return &statsFuncCountProcessor{ +func (sfc *statsFuncCount) newStatsFuncProcessor() (statsFuncProcessor, int) { + sfcp := &statsFuncCountProcessor{ sfc: sfc, } + return sfcp, int(unsafe.Sizeof(*sfcp)) } type statsFuncCountProcessor struct { @@ -518,12 +563,12 @@ type statsFuncCountProcessor struct { rowsCount uint64 } -func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) { +func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { fields := sfcp.sfc.fields if len(fields) == 0 || slices.Contains(fields, "*") { // Fast path - count all the columns. sfcp.rowsCount += uint64(len(timestamps)) - return + return 0 } // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). @@ -545,23 +590,25 @@ func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, c }) sfcp.rowsCount += uint64(len(timestamps) - emptyValues) + return 0 } -func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) { +func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { fields := sfcp.sfc.fields if len(fields) == 0 || slices.Contains(fields, "*") { // Fast path - count the given column sfcp.rowsCount++ - return + return 0 } // Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty for _, f := range fields { if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" { sfcp.rowsCount++ - return + return 0 } } + return 0 } func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) { @@ -587,12 +634,13 @@ func (sfu *statsFuncUniq) neededFields() []string { return sfu.fields } -func (sfu *statsFuncUniq) newStatsFuncProcessor() statsFuncProcessor { - return &statsFuncUniqProcessor{ +func (sfu *statsFuncUniq) newStatsFuncProcessor() (statsFuncProcessor, int) { + sfup := &statsFuncUniqProcessor{ sfu: sfu, m: make(map[string]struct{}), } + return sfup, int(unsafe.Sizeof(*sfup)) } type statsFuncUniqProcessor struct { @@ -604,10 +652,11 @@ type statsFuncUniqProcessor struct { keyBuf []byte } -func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) { +func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { fields := sfup.sfu.fields m := sfup.m + stateSizeIncrease := 0 if len(fields) == 1 { // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { @@ -619,10 +668,11 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co if _, ok := m[v]; !ok { vCopy := strings.Clone(v) m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) } } } - return + return stateSizeIncrease } // Slow path for multiple columns. @@ -651,29 +701,33 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co } if _, ok := m[string(keyBuf)]; !ok { m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } } sfup.keyBuf = keyBuf + return stateSizeIncrease } -func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) { +func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int { fields := sfup.sfu.fields m := sfup.m + stateSizeIncrease := 0 if len(fields) == 1 { // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { v := columns[idx].Values[rowIdx] if v == "" { // Do not count empty values - return + return stateSizeIncrease } if _, ok := m[v]; !ok { vCopy := strings.Clone(v) m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) } } - return + return stateSizeIncrease } // Slow path for multiple columns. @@ -693,11 +747,13 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column if allEmptyValues { // Do not count empty values - return + return stateSizeIncrease } if _, ok := m[string(keyBuf)]; !ok { m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) } + return stateSizeIncrease } func (sfup *statsFuncUniqProcessor) mergeState(sfp statsFuncProcessor) { @@ -789,26 +845,26 @@ type headPipeProcessor struct { cancel func() ppBase pipeProcessor - rowsWritten atomic.Uint64 + rowsProcessed atomic.Uint64 } func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - rowsWritten := hpp.rowsWritten.Add(uint64(len(timestamps))) - if rowsWritten <= hpp.hp.n { + rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps))) + if rowsProcessed <= hpp.hp.n { // Fast path - write all the rows to ppBase. hpp.ppBase.writeBlock(workerID, timestamps, columns) return } // Slow path - overflow. Write the remaining rows if needed. - rowsWritten -= uint64(len(timestamps)) - if rowsWritten >= hpp.hp.n { + rowsProcessed -= uint64(len(timestamps)) + if rowsProcessed >= hpp.hp.n { // Nothing to write. There is no need in cancel() call, since it has been called by another goroutine. return } // Write remaining rows. - rowsRemaining := hpp.hp.n - rowsWritten + rowsRemaining := hpp.hp.n - rowsProcessed cs := make([]BlockColumn, len(columns)) for i, c := range columns { cDst := &cs[i] @@ -860,22 +916,22 @@ type skipPipeProcessor struct { sp *skipPipe ppBase pipeProcessor - rowsSkipped atomic.Uint64 + rowsProcessed atomic.Uint64 } func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { - rowsSkipped := spp.rowsSkipped.Add(uint64(len(timestamps))) - if rowsSkipped <= spp.sp.n { + rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps))) + if rowsProcessed <= spp.sp.n { return } - rowsSkipped -= uint64(len(timestamps)) - if rowsSkipped >= spp.sp.n { + rowsProcessed -= uint64(len(timestamps)) + if rowsProcessed >= spp.sp.n { spp.ppBase.writeBlock(workerID, timestamps, columns) return } - rowsRemaining := spp.sp.n - rowsSkipped + rowsRemaining := spp.sp.n - rowsProcessed cs := make([]BlockColumn, len(columns)) for i, c := range columns { cDst := &cs[i]