This commit is contained in:
Aliaksandr Valialkin 2024-04-29 03:23:41 +02:00
parent 6339cae5de
commit dd55ed98a8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 31 additions and 31 deletions

View file

@ -200,31 +200,31 @@ type statsFunc interface {
// neededFields returns the needed fields for calculating the given stats // neededFields returns the needed fields for calculating the given stats
neededFields() []string neededFields() []string
// newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc. // newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc.
// //
// It also must return the size in bytes of the returned statsFuncProcessor. // It also must return the size in bytes of the returned statsProcessor.
newStatsFuncProcessor() (statsFuncProcessor, int) newStatsProcessor() (statsProcessor, int)
} }
// statsFuncProcessor must process stats for some statsFunc. // statsProcessor must process stats for some statsFunc.
// //
// All the statsFuncProcessor methods are called from a single goroutine at a time, // All the statsProcessor methods are called from a single goroutine at a time,
// so there is no need in the internal synchronization. // so there is no need in the internal synchronization.
type statsFuncProcessor interface { type statsProcessor interface {
// updateStatsForAllRows must update statsFuncProcessor stats from all the rows. // updateStatsForAllRows must update statsProcessor stats from all the rows.
// //
// It must return the increase of internal state size in bytes for the statsFuncProcessor. // It must return the increase of internal state size in bytes for the statsProcessor.
updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int
// updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex. // updateStatsForRow must update statsProcessor stats from the row at rowIndex.
// //
// It must return the increase of internal state size in bytes for the statsFuncProcessor. // It must return the increase of internal state size in bytes for the statsProcessor.
updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int
// mergeState must merge sfp state into statsFuncProcessor state. // mergeState must merge sfp state into statsProcessor state.
mergeState(sfp statsFuncProcessor) mergeState(sfp statsProcessor)
// finalizeStats must return the collected stats from statsFuncProcessor. // finalizeStats must return the collected stats from statsProcessor.
finalizeStats() (name, value string) finalizeStats() (name, value string)
} }
@ -303,12 +303,12 @@ type statsPipeProcessorShardNopad struct {
stateSizeBudget int stateSizeBudget int
} }
func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []statsFuncProcessor { func (shard *statsPipeProcessorShard) getStatsProcessors(key []byte) []statsProcessor {
spg := shard.m[string(key)] spg := shard.m[string(key)]
if spg == nil { if spg == nil {
sfps := make([]statsFuncProcessor, len(shard.sp.funcs)) sfps := make([]statsProcessor, len(shard.sp.funcs))
for i, f := range shard.sp.funcs { for i, f := range shard.sp.funcs {
sfp, stateSize := f.newStatsFuncProcessor() sfp, stateSize := f.newStatsProcessor()
sfps[i] = sfp sfps[i] = sfp
shard.stateSizeBudget -= stateSize shard.stateSizeBudget -= stateSize
} }
@ -322,7 +322,7 @@ func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []stats
} }
type statsPipeGroup struct { type statsPipeGroup struct {
sfps []statsFuncProcessor sfps []statsProcessor
} }
func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
@ -345,7 +345,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
byFields := spp.sp.byFields byFields := spp.sp.byFields
if len(byFields) == 0 { if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key. // Fast path - pass all the rows to a single group with empty key.
for _, sfp := range shard.getStatsFuncProcessors(nil) { for _, sfp := range shard.getStatsProcessors(nil) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
} }
return return
@ -356,19 +356,19 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
if isConstValue(values) { if isConstValue(values) {
// Fast path for column with constant value. // Fast path for column with constant value.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
for _, sfp := range shard.getStatsFuncProcessors(shard.keyBuf) { for _, sfp := range shard.getStatsProcessors(shard.keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
} }
return return
} }
// Slower path for column with different values. // Slower path for column with different values.
var sfps []statsFuncProcessor var sfps []statsProcessor
keyBuf := shard.keyBuf keyBuf := shard.keyBuf
for i := range timestamps { for i := range timestamps {
if i <= 0 || values[i-1] != values[i] { if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
sfps = shard.getStatsFuncProcessors(keyBuf) sfps = shard.getStatsProcessors(keyBuf)
} }
for _, sfp := range sfps { for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
@ -388,7 +388,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
for _, values := range columnValues { for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
} }
for _, sfp := range shard.getStatsFuncProcessors(keyBuf) { for _, sfp := range shard.getStatsProcessors(keyBuf) {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
} }
shard.keyBuf = keyBuf shard.keyBuf = keyBuf
@ -396,7 +396,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
} }
// The slowest path - group by multiple columns. // The slowest path - group by multiple columns.
var sfps []statsFuncProcessor var sfps []statsProcessor
keyBuf := shard.keyBuf keyBuf := shard.keyBuf
for i := range timestamps { for i := range timestamps {
// verify whether the key for 'by (...)' fields equals the previous key // verify whether the key for 'by (...)' fields equals the previous key
@ -413,7 +413,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
for _, values := range columnValues { for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
} }
sfps = shard.getStatsFuncProcessors(keyBuf) sfps = shard.getStatsProcessors(keyBuf)
} }
for _, sfp := range sfps { for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
@ -480,7 +480,7 @@ func (spp *statsPipeProcessor) flush() error {
byFields := spp.sp.byFields byFields := spp.sp.byFields
if len(byFields) == 0 && len(m) == 0 { if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows. // Special case - zero matching rows.
_ = shards[0].getStatsFuncProcessors(nil) _ = shards[0].getStatsProcessors(nil)
m = shards[0].m m = shards[0].m
} }

View file

@ -22,7 +22,7 @@ func (sc *statsCount) neededFields() []string {
return getFieldsIgnoreStar(sc.fields) return getFieldsIgnoreStar(sc.fields)
} }
func (sc *statsCount) newStatsFuncProcessor() (statsFuncProcessor, int) { func (sc *statsCount) newStatsProcessor() (statsProcessor, int) {
scp := &statsCountProcessor{ scp := &statsCountProcessor{
sc: sc, sc: sc,
} }
@ -85,7 +85,7 @@ func (scp *statsCountProcessor) updateStatsForRow(_ []int64, columns []BlockColu
return 0 return 0
} }
func (scp *statsCountProcessor) mergeState(sfp statsFuncProcessor) { func (scp *statsCountProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsCountProcessor) src := sfp.(*statsCountProcessor)
scp.rowsCount += src.rowsCount scp.rowsCount += src.rowsCount
} }

View file

@ -22,7 +22,7 @@ func (ss *statsSum) neededFields() []string {
return ss.fields return ss.fields
} }
func (ss *statsSum) newStatsFuncProcessor() (statsFuncProcessor, int) { func (ss *statsSum) newStatsProcessor() (statsProcessor, int) {
ssp := &statsSumProcessor{ ssp := &statsSumProcessor{
ss: ss, ss: ss,
} }
@ -95,7 +95,7 @@ func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn
return 0 return 0
} }
func (ssp *statsSumProcessor) mergeState(sfp statsFuncProcessor) { func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsSumProcessor) src := sfp.(*statsSumProcessor)
ssp.sum += src.sum ssp.sum += src.sum
} }

View file

@ -25,7 +25,7 @@ func (su *statsUniq) neededFields() []string {
return su.fields return su.fields
} }
func (su *statsUniq) newStatsFuncProcessor() (statsFuncProcessor, int) { func (su *statsUniq) newStatsProcessor() (statsProcessor, int) {
sup := &statsUniqProcessor{ sup := &statsUniqProcessor{
su: su, su: su,
@ -222,7 +222,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []B
return stateSizeIncrease return stateSizeIncrease
} }
func (sup *statsUniqProcessor) mergeState(sfp statsFuncProcessor) { func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsUniqProcessor) src := sfp.(*statsUniqProcessor)
m := sup.m m := sup.m
for k := range src.m { for k := range src.m {