From bbb9887ccf17a45e71036bc63468ee91de3da149 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 17:29:24 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_stats.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 8f8039e41..f83235ff2 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -152,18 +152,11 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, maxStateSize := int64(float64(memory.Allowed()) * 0.3) shards := make([]pipeStatsProcessorShard, workersCount) - funcsLen := len(ps.funcs) for i := range shards { shards[i] = pipeStatsProcessorShard{ pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{ ps: ps, - m: make(map[string]*pipeStatsGroup), - - bms: make([]bitmap, funcsLen), - brs: make([]*blockResult, funcsLen), - brsBuf: make([]blockResult, funcsLen), - stateSizeBudget: stateSizeBudgetChunk, }, } @@ -220,7 +213,22 @@ type pipeStatsProcessorShardNopad struct { stateSizeBudget int } +func (shard *pipeStatsProcessorShard) init() { + if shard.m != nil { + // Already initialized + return + } + + funcsLen := len(shard.ps.funcs) + + shard.m = make(map[string]*pipeStatsGroup) + shard.bms = make([]bitmap, funcsLen) + shard.brs = make([]*blockResult, funcsLen) + shard.brsBuf = make([]blockResult, funcsLen) +} + func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { + shard.init() byFields := shard.ps.byFields // Apply per-function filters @@ -428,6 +436,7 @@ func (psp *pipeStatsProcessor) flush() error { // Merge states across shards shards := psp.shards + shards[0].init() m := shards[0].m shards = shards[1:] for i := range shards {