From dd55ed98a83e2ce5808a58b53c7d1c076807fa49 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Apr 2024 03:23:41 +0200 Subject: [PATCH] wip --- lib/logstorage/pipes.go | 50 +++++++++++++++++----------------- lib/logstorage/stats_count.go | 4 +-- lib/logstorage/stats_sum.go | 4 +-- lib/logstorage/stats_unique.go | 4 +-- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 1364f2554..212a39846 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -200,31 +200,31 @@ type statsFunc interface { // neededFields returns the needed fields for calculating the given stats neededFields() []string - // newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc. + // newStatsProcessor must create new statsProcessor for calculating stats for the given statsFunc. // - // It also must return the size in bytes of the returned statsFuncProcessor. - newStatsFuncProcessor() (statsFuncProcessor, int) + // It also must return the size in bytes of the returned statsProcessor. + newStatsProcessor() (statsProcessor, int) } -// statsFuncProcessor must process stats for some statsFunc. +// statsProcessor must process stats for some statsFunc. // -// All the statsFuncProcessor methods are called from a single goroutine at a time, +// All the statsProcessor methods are called from a single goroutine at a time, // so there is no need in the internal synchronization. -type statsFuncProcessor interface { - // updateStatsForAllRows must update statsFuncProcessor stats from all the rows. +type statsProcessor interface { + // updateStatsForAllRows must update statsProcessor stats from all the rows. // - // It must return the increase of internal state size in bytes for the statsFuncProcessor. + // It must return the increase of internal state size in bytes for the statsProcessor. updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int - // updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex. + // updateStatsForRow must update statsProcessor stats from the row at rowIndex. // - // It must return the increase of internal state size in bytes for the statsFuncProcessor. + // It must return the increase of internal state size in bytes for the statsProcessor. updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) int - // mergeState must merge sfp state into statsFuncProcessor state. - mergeState(sfp statsFuncProcessor) + // mergeState must merge sfp state into statsProcessor state. + mergeState(sfp statsProcessor) - // finalizeStats must return the collected stats from statsFuncProcessor. + // finalizeStats must return the collected stats from statsProcessor. finalizeStats() (name, value string) } @@ -303,12 +303,12 @@ type statsPipeProcessorShardNopad struct { stateSizeBudget int } -func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []statsFuncProcessor { +func (shard *statsPipeProcessorShard) getStatsProcessors(key []byte) []statsProcessor { spg := shard.m[string(key)] if spg == nil { - sfps := make([]statsFuncProcessor, len(shard.sp.funcs)) + sfps := make([]statsProcessor, len(shard.sp.funcs)) for i, f := range shard.sp.funcs { - sfp, stateSize := f.newStatsFuncProcessor() + sfp, stateSize := f.newStatsProcessor() sfps[i] = sfp shard.stateSizeBudget -= stateSize } @@ -322,7 +322,7 @@ func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []stats } type statsPipeGroup struct { - sfps []statsFuncProcessor + sfps []statsProcessor } func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { @@ -345,7 +345,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col byFields := spp.sp.byFields if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. - for _, sfp := range shard.getStatsFuncProcessors(nil) { + for _, sfp := range shard.getStatsProcessors(nil) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } return @@ -356,19 +356,19 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col if isConstValue(values) { // Fast path for column with constant value. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) - for _, sfp := range shard.getStatsFuncProcessors(shard.keyBuf) { + for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } return } // Slower path for column with different values. - var sfps []statsFuncProcessor + var sfps []statsProcessor keyBuf := shard.keyBuf for i := range timestamps { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) - sfps = shard.getStatsFuncProcessors(keyBuf) + sfps = shard.getStatsProcessors(keyBuf) } for _, sfp := range sfps { shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) @@ -388,7 +388,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) } - for _, sfp := range shard.getStatsFuncProcessors(keyBuf) { + for _, sfp := range shard.getStatsProcessors(keyBuf) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } shard.keyBuf = keyBuf @@ -396,7 +396,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } // The slowest path - group by multiple columns. - var sfps []statsFuncProcessor + var sfps []statsProcessor keyBuf := shard.keyBuf for i := range timestamps { // verify whether the key for 'by (...)' fields equals the previous key @@ -413,7 +413,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) } - sfps = shard.getStatsFuncProcessors(keyBuf) + sfps = shard.getStatsProcessors(keyBuf) } for _, sfp := range sfps { shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) @@ -480,7 +480,7 @@ func (spp *statsPipeProcessor) flush() error { byFields := spp.sp.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. - _ = shards[0].getStatsFuncProcessors(nil) + _ = shards[0].getStatsProcessors(nil) m = shards[0].m } diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 22406b24f..c140e68e1 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -22,7 +22,7 @@ func (sc *statsCount) neededFields() []string { return getFieldsIgnoreStar(sc.fields) } -func (sc *statsCount) newStatsFuncProcessor() (statsFuncProcessor, int) { +func (sc *statsCount) newStatsProcessor() (statsProcessor, int) { scp := &statsCountProcessor{ sc: sc, } @@ -85,7 +85,7 @@ func (scp *statsCountProcessor) updateStatsForRow(_ []int64, columns []BlockColu return 0 } -func (scp *statsCountProcessor) mergeState(sfp statsFuncProcessor) { +func (scp *statsCountProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsCountProcessor) scp.rowsCount += src.rowsCount } diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go index fd6a73c82..0ee7b32e6 100644 --- a/lib/logstorage/stats_sum.go +++ b/lib/logstorage/stats_sum.go @@ -22,7 +22,7 @@ func (ss *statsSum) neededFields() []string { return ss.fields } -func (ss *statsSum) newStatsFuncProcessor() (statsFuncProcessor, int) { +func (ss *statsSum) newStatsProcessor() (statsProcessor, int) { ssp := &statsSumProcessor{ ss: ss, } @@ -95,7 +95,7 @@ func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn return 0 } -func (ssp *statsSumProcessor) mergeState(sfp statsFuncProcessor) { +func (ssp *statsSumProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsSumProcessor) ssp.sum += src.sum } diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_unique.go index 3fa571f60..265841f5d 100644 --- a/lib/logstorage/stats_unique.go +++ b/lib/logstorage/stats_unique.go @@ -25,7 +25,7 @@ func (su *statsUniq) neededFields() []string { return su.fields } -func (su *statsUniq) newStatsFuncProcessor() (statsFuncProcessor, int) { +func (su *statsUniq) newStatsProcessor() (statsProcessor, int) { sup := &statsUniqProcessor{ su: su, @@ -222,7 +222,7 @@ func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []B return stateSizeIncrease } -func (sup *statsUniqProcessor) mergeState(sfp statsFuncProcessor) { +func (sup *statsUniqProcessor) mergeState(sfp statsProcessor) { src := sfp.(*statsUniqProcessor) m := sup.m for k := range src.m {