diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 72adddb30..5106c1c1e 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -254,8 +254,8 @@ func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, shards := make([]statsPipeProcessorShard, workersCount) for i := range shards { shard := &shards[i] + shard.sp = sp shard.m = make(map[string]*statsPipeGroup) - shard.funcs = sp.funcs shard.stateSizeBudget = stateSizeBudgetChunk maxStateSize -= stateSizeBudgetChunk } @@ -295,8 +295,8 @@ type statsPipeProcessorShard struct { } type statsPipeProcessorShardNopad struct { - m map[string]*statsPipeGroup - funcs []statsFunc + sp *statsPipe + m map[string]*statsPipeGroup columnValues [][]string keyBuf []byte @@ -304,11 +304,11 @@ type statsPipeProcessorShardNopad struct { stateSizeBudget int } -func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { +func (shard *statsPipeProcessorShard) getStatsFuncProcessors(key []byte) []statsFuncProcessor { spg := shard.m[string(key)] if spg == nil { - sfps := make([]statsFuncProcessor, len(shard.funcs)) - for i, f := range shard.funcs { + sfps := make([]statsFuncProcessor, len(shard.sp.funcs)) + for i, f := range shard.sp.funcs { sfp, stateSize := f.newStatsFuncProcessor() sfps[i] = sfp shard.stateSizeBudget -= stateSize @@ -319,7 +319,7 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr shard.m[string(key)] = spg shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) } - return spg + return spg.sfps } type statsPipeGroup struct { @@ -344,10 +344,10 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } byFields := spp.sp.byFields - if len(byFields) == 0 || len(byFields) == 1 && getBlockColumnIndex(columns, byFields[0]) < 0 { + if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. - spg := shard.getStatsPipeGroup(nil) - for _, sfp := range spg.sfps { + sfps := shard.getStatsFuncProcessors(nil) + for _, sfp := range sfps { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } return @@ -363,14 +363,14 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } values := columns[idx].Values - var spg *statsPipeGroup + var sfps []statsFuncProcessor keyBuf := shard.keyBuf for i := range timestamps { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) - spg = shard.getStatsPipeGroup(keyBuf) + sfps = shard.getStatsFuncProcessors(keyBuf) } - for _, sfp := range spg.sfps { + for _, sfp := range sfps { shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) } } @@ -383,11 +383,11 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields) columnValues := shard.columnValues - var spg *statsPipeGroup + var sfps []statsFuncProcessor keyBuf := shard.keyBuf for i := range timestamps { // verify whether the key for 'by (...)' fields equals the previous key - sameValue := spg != nil + sameValue := sfps != nil for _, values := range columnValues { if values == nil { continue @@ -407,9 +407,9 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } - spg = shard.getStatsPipeGroup(keyBuf) + sfps = shard.getStatsFuncProcessors(keyBuf) } - for _, sfp := range spg.sfps { + for _, sfp := range sfps { shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) } } @@ -451,7 +451,7 @@ func (spp *statsPipeProcessor) flush() error { byFields := spp.sp.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. - _ = shards[0].getStatsPipeGroup(nil) + _ = shards[0].getStatsFuncProcessors(nil) m = shards[0].m }