This commit is contained in:
Aliaksandr Valialkin 2024-04-28 12:52:21 +02:00
parent 75914210ec
commit 3008c58ac8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -11,6 +11,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
type pipe interface {
@ -197,7 +198,9 @@ type statsFunc interface {
neededFields() []string
// newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc.
newStatsFuncProcessor() statsFuncProcessor
//
// It also must return the size in bytes of the returned statsFuncProcessor.
newStatsFuncProcessor() (statsFuncProcessor, int)
}
// statsFuncProcessor must process stats for some statsFunc.
@ -206,10 +209,14 @@ type statsFunc interface {
// so there is no need in the internal synchronization.
type statsFuncProcessor interface {
// updateStatsForAllRows must update statsFuncProcessor stats from all the rows.
updateStatsForAllRows(timestamps []int64, columns []BlockColumn)
//
// It must return the increase of internal state size in bytes for the statsFuncProcessor.
updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int
// updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex.
updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int)
//
// It must return the increase of internal state size in bytes for the statsFuncProcessor.
updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int
// mergeState must merge sfp state into statsFuncProcessor state.
mergeState(sfp statsFuncProcessor)
@ -235,22 +242,33 @@ func (sp *statsPipe) String() string {
return s
}
const stateSizeBudgetChunk = 1 << 20
func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
shards := make([]statsPipeProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.m = make(map[string]*statsPipeGroup)
shard.funcs = sp.funcs
shard.stateSizeBudget = stateSizeBudgetChunk
maxStateSize -= stateSizeBudgetChunk
}
return &statsPipeProcessor{
spp := &statsPipeProcessor{
sp: sp,
stopCh: stopCh,
cancel: cancel,
ppBase: ppBase,
shards: shards,
maxStateSize: maxStateSize,
}
spp.stateSizeBudget.Store(maxStateSize)
return spp
}
type statsPipeProcessor struct {
@ -260,6 +278,9 @@ type statsPipeProcessor struct {
ppBase pipeProcessor
shards []statsPipeProcessorShard
maxStateSize int64
stateSizeBudget atomic.Int64
}
type statsPipeProcessorShard struct {
@ -275,6 +296,8 @@ type statsPipeProcessorShardNopad struct {
columnIdxs []int
keyBuf []byte
stateSizeBudget int
}
func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup {
@ -284,12 +307,15 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr
}
sfps := make([]statsFuncProcessor, len(shard.funcs))
for i, f := range shard.funcs {
sfps[i] = f.newStatsFuncProcessor()
sfp, stateSize := f.newStatsFuncProcessor()
sfps[i] = sfp
shard.stateSizeBudget -= stateSize
}
spg = &statsPipeGroup{
sfps: sfps,
}
shard.m[string(key)] = spg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
return spg
}
@ -300,11 +326,25 @@ type statsPipeGroup struct {
func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
shard := &spp.shards[workerID]
for shard.stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := spp.stateSizeBudget.Add(-stateSizeBudgetChunk)
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining+stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
spp.cancel()
}
return
}
shard.stateSizeBudget += stateSizeBudgetChunk
}
if len(spp.sp.byFields) == 0 {
// Fast path - pass all the rows to a single group
spg := shard.getStatsPipeGroup(nil)
for _, sfp := range spg.sfps {
sfp.updateStatsForAllRows(timestamps, columns)
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
}
return
}
@ -329,13 +369,17 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
spg := shard.getStatsPipeGroup(keyBuf)
for _, sfp := range spg.sfps {
sfp.updateStatsForRow(timestamps, columns, i)
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
}
}
shard.keyBuf = keyBuf
}
func (spp *statsPipeProcessor) flush() error {
if n := spp.stateSizeBudget.Load(); n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20))
}
// Merge states across shards
shards := spp.shards
m := shards[0].m
@ -506,10 +550,11 @@ func (sfc *statsFuncCount) neededFields() []string {
return getFieldsIgnoreStar(sfc.fields)
}
func (sfc *statsFuncCount) newStatsFuncProcessor() statsFuncProcessor {
return &statsFuncCountProcessor{
func (sfc *statsFuncCount) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfcp := &statsFuncCountProcessor{
sfc: sfc,
}
return sfcp, int(unsafe.Sizeof(*sfcp))
}
type statsFuncCountProcessor struct {
@ -518,12 +563,12 @@ type statsFuncCountProcessor struct {
rowsCount uint64
}
func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) {
func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sfcp.sfc.fields
if len(fields) == 0 || slices.Contains(fields, "*") {
// Fast path - count all the columns.
sfcp.rowsCount += uint64(len(timestamps))
return
return 0
}
// Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count().
@ -545,23 +590,25 @@ func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, c
})
sfcp.rowsCount += uint64(len(timestamps) - emptyValues)
return 0
}
func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) {
func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
fields := sfcp.sfc.fields
if len(fields) == 0 || slices.Contains(fields, "*") {
// Fast path - count the given column
sfcp.rowsCount++
return
return 0
}
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
for _, f := range fields {
if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" {
sfcp.rowsCount++
return
return 0
}
}
return 0
}
func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) {
@ -587,12 +634,13 @@ func (sfu *statsFuncUniq) neededFields() []string {
return sfu.fields
}
func (sfu *statsFuncUniq) newStatsFuncProcessor() statsFuncProcessor {
return &statsFuncUniqProcessor{
func (sfu *statsFuncUniq) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfup := &statsFuncUniqProcessor{
sfu: sfu,
m: make(map[string]struct{}),
}
return sfup, int(unsafe.Sizeof(*sfup))
}
type statsFuncUniqProcessor struct {
@ -604,10 +652,11 @@ type statsFuncUniqProcessor struct {
keyBuf []byte
}
func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) {
func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
fields := sfup.sfu.fields
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
@ -619,10 +668,11 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
}
return
return stateSizeIncrease
}
// Slow path for multiple columns.
@ -651,29 +701,33 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
}
sfup.keyBuf = keyBuf
return stateSizeIncrease
}
func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) {
func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int {
fields := sfup.sfu.fields
m := sfup.m
stateSizeIncrease := 0
if len(fields) == 1 {
// Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
v := columns[idx].Values[rowIdx]
if v == "" {
// Do not count empty values
return
return stateSizeIncrease
}
if _, ok := m[v]; !ok {
vCopy := strings.Clone(v)
m[vCopy] = struct{}{}
stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy))
}
}
return
return stateSizeIncrease
}
// Slow path for multiple columns.
@ -693,11 +747,13 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column
if allEmptyValues {
// Do not count empty values
return
return stateSizeIncrease
}
if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{}
stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof(""))
}
return stateSizeIncrease
}
func (sfup *statsFuncUniqProcessor) mergeState(sfp statsFuncProcessor) {
@ -789,26 +845,26 @@ type headPipeProcessor struct {
cancel func()
ppBase pipeProcessor
rowsWritten atomic.Uint64
rowsProcessed atomic.Uint64
}
func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
rowsWritten := hpp.rowsWritten.Add(uint64(len(timestamps)))
if rowsWritten <= hpp.hp.n {
rowsProcessed := hpp.rowsProcessed.Add(uint64(len(timestamps)))
if rowsProcessed <= hpp.hp.n {
// Fast path - write all the rows to ppBase.
hpp.ppBase.writeBlock(workerID, timestamps, columns)
return
}
// Slow path - overflow. Write the remaining rows if needed.
rowsWritten -= uint64(len(timestamps))
if rowsWritten >= hpp.hp.n {
rowsProcessed -= uint64(len(timestamps))
if rowsProcessed >= hpp.hp.n {
// Nothing to write. There is no need in cancel() call, since it has been called by another goroutine.
return
}
// Write remaining rows.
rowsRemaining := hpp.hp.n - rowsWritten
rowsRemaining := hpp.hp.n - rowsProcessed
cs := make([]BlockColumn, len(columns))
for i, c := range columns {
cDst := &cs[i]
@ -860,22 +916,22 @@ type skipPipeProcessor struct {
sp *skipPipe
ppBase pipeProcessor
rowsSkipped atomic.Uint64
rowsProcessed atomic.Uint64
}
func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
rowsSkipped := spp.rowsSkipped.Add(uint64(len(timestamps)))
if rowsSkipped <= spp.sp.n {
rowsProcessed := spp.rowsProcessed.Add(uint64(len(timestamps)))
if rowsProcessed <= spp.sp.n {
return
}
rowsSkipped -= uint64(len(timestamps))
if rowsSkipped >= spp.sp.n {
rowsProcessed -= uint64(len(timestamps))
if rowsProcessed >= spp.sp.n {
spp.ppBase.writeBlock(workerID, timestamps, columns)
return
}
rowsRemaining := spp.sp.n - rowsSkipped
rowsRemaining := spp.sp.n - rowsProcessed
cs := make([]BlockColumn, len(columns))
for i, c := range columns {
cDst := &cs[i]