This commit is contained in:
Aliaksandr Valialkin 2024-04-29 00:35:16 +02:00
parent 20fd87f86f
commit 1b67995907
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -254,8 +254,8 @@ func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{},
shards := make([]statsPipeProcessorShard, workersCount) shards := make([]statsPipeProcessorShard, workersCount)
for i := range shards { for i := range shards {
shard := &shards[i] shard := &shards[i]
shard.sp = sp
shard.m = make(map[string]*statsPipeGroup) shard.m = make(map[string]*statsPipeGroup)
shard.funcs = sp.funcs
shard.stateSizeBudget = stateSizeBudgetChunk shard.stateSizeBudget = stateSizeBudgetChunk
maxStateSize -= stateSizeBudgetChunk maxStateSize -= stateSizeBudgetChunk
} }
@ -295,8 +295,8 @@ type statsPipeProcessorShard struct {
} }
type statsPipeProcessorShardNopad struct { type statsPipeProcessorShardNopad struct {
m map[string]*statsPipeGroup sp *statsPipe
funcs []statsFunc m map[string]*statsPipeGroup
columnValues [][]string columnValues [][]string
keyBuf []byte keyBuf []byte
@ -304,11 +304,11 @@ type statsPipeProcessorShardNopad struct {
stateSizeBudget int stateSizeBudget int
} }
func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []statsFuncProcessor {
spg := shard.m[string(key)] spg := shard.m[string(key)]
if spg == nil { if spg == nil {
sfps := make([]statsFuncProcessor, len(shard.funcs)) sfps := make([]statsFuncProcessor, len(shard.sp.funcs))
for i, f := range shard.funcs { for i, f := range shard.sp.funcs {
sfp, stateSize := f.newStatsFuncProcessor() sfp, stateSize := f.newStatsFuncProcessor()
sfps[i] = sfp sfps[i] = sfp
shard.stateSizeBudget -= stateSize shard.stateSizeBudget -= stateSize
@ -319,7 +319,7 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr
shard.m[string(key)] = spg shard.m[string(key)] = spg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
} }
return spg return spg.sfps
} }
type statsPipeGroup struct { type statsPipeGroup struct {
@ -344,10 +344,10 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
} }
byFields := spp.sp.byFields byFields := spp.sp.byFields
if len(byFields) == 0 || len(byFields) == 1 && getBlockColumnIndex(columns, byFields[0]) < 0 { if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key. // Fast path - pass all the rows to a single group with empty key.
spg := shard.getStatsPipeGroup(nil) sfps := shard.getStatsFuncProcessors(nil)
for _, sfp := range spg.sfps { for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
} }
return return
@ -363,14 +363,14 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
} }
values := columns[idx].Values values := columns[idx].Values
var spg *statsPipeGroup var sfps []statsFuncProcessor
keyBuf := shard.keyBuf keyBuf := shard.keyBuf
for i := range timestamps { for i := range timestamps {
if i <= 0 || values[i-1] != values[i] { if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
spg = shard.getStatsPipeGroup(keyBuf) sfps = shard.getStatsFuncProcessors(keyBuf)
} }
for _, sfp := range spg.sfps { for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
} }
} }
@ -383,11 +383,11 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields) shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields)
columnValues := shard.columnValues columnValues := shard.columnValues
var spg *statsPipeGroup var sfps []statsFuncProcessor
keyBuf := shard.keyBuf keyBuf := shard.keyBuf
for i := range timestamps { for i := range timestamps {
// verify whether the key for 'by (...)' fields equals the previous key // verify whether the key for 'by (...)' fields equals the previous key
sameValue := spg != nil sameValue := sfps != nil
for _, values := range columnValues { for _, values := range columnValues {
if values == nil { if values == nil {
continue continue
@ -407,9 +407,9 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
} }
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
} }
spg = shard.getStatsPipeGroup(keyBuf) sfps = shard.getStatsFuncProcessors(keyBuf)
} }
for _, sfp := range spg.sfps { for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
} }
} }
@ -451,7 +451,7 @@ func (spp *statsPipeProcessor) flush() error {
byFields := spp.sp.byFields byFields := spp.sp.byFields
if len(byFields) == 0 && len(m) == 0 { if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows. // Special case - zero matching rows.
_ = shards[0].getStatsPipeGroup(nil) _ = shards[0].getStatsFuncProcessors(nil)
m = shards[0].m m = shards[0].m
} }