mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
874c061382
commit
bbb9887ccf
1 changed files with 16 additions and 7 deletions
|
@ -152,18 +152,11 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{},
|
||||||
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
|
maxStateSize := int64(float64(memory.Allowed()) * 0.3)
|
||||||
|
|
||||||
shards := make([]pipeStatsProcessorShard, workersCount)
|
shards := make([]pipeStatsProcessorShard, workersCount)
|
||||||
funcsLen := len(ps.funcs)
|
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
shards[i] = pipeStatsProcessorShard{
|
shards[i] = pipeStatsProcessorShard{
|
||||||
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
|
pipeStatsProcessorShardNopad: pipeStatsProcessorShardNopad{
|
||||||
ps: ps,
|
ps: ps,
|
||||||
|
|
||||||
m: make(map[string]*pipeStatsGroup),
|
|
||||||
|
|
||||||
bms: make([]bitmap, funcsLen),
|
|
||||||
brs: make([]*blockResult, funcsLen),
|
|
||||||
brsBuf: make([]blockResult, funcsLen),
|
|
||||||
|
|
||||||
stateSizeBudget: stateSizeBudgetChunk,
|
stateSizeBudget: stateSizeBudgetChunk,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -220,7 +213,22 @@ type pipeStatsProcessorShardNopad struct {
|
||||||
stateSizeBudget int
|
stateSizeBudget int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (shard *pipeStatsProcessorShard) init() {
|
||||||
|
if shard.m != nil {
|
||||||
|
// Already initialized
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
funcsLen := len(shard.ps.funcs)
|
||||||
|
|
||||||
|
shard.m = make(map[string]*pipeStatsGroup)
|
||||||
|
shard.bms = make([]bitmap, funcsLen)
|
||||||
|
shard.brs = make([]*blockResult, funcsLen)
|
||||||
|
shard.brsBuf = make([]blockResult, funcsLen)
|
||||||
|
}
|
||||||
|
|
||||||
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
|
||||||
|
shard.init()
|
||||||
byFields := shard.ps.byFields
|
byFields := shard.ps.byFields
|
||||||
|
|
||||||
// Apply per-function filters
|
// Apply per-function filters
|
||||||
|
@ -428,6 +436,7 @@ func (psp *pipeStatsProcessor) flush() error {
|
||||||
|
|
||||||
// Merge states across shards
|
// Merge states across shards
|
||||||
shards := psp.shards
|
shards := psp.shards
|
||||||
|
shards[0].init()
|
||||||
m := shards[0].m
|
m := shards[0].m
|
||||||
shards = shards[1:]
|
shards = shards[1:]
|
||||||
for i := range shards {
|
for i := range shards {
|
||||||
|
|
Loading…
Reference in a new issue