diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 92f89d49c..de1746f5e 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -296,19 +296,11 @@ type statsPipeProcessorShardNopad struct { columnIdxs []int keyBuf []byte - keyBufPrev []byte - spgPrev *statsPipeGroup stateSizeBudget int } func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { - if shard.spgPrev != nil && string(shard.keyBufPrev) == string(key) { - // Fast path - return the spg for the same key. - return shard.spgPrev - } - - // Slow path - locate spg by key. spg := shard.m[string(key)] if spg == nil { sfps := make([]statsFuncProcessor, len(shard.funcs)) @@ -323,9 +315,6 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr shard.m[string(key)] = spg shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) } - - shard.keyBufPrev = append(shard.keyBufPrev[:0], key...) - shard.spgPrev = spg return spg } @@ -366,18 +355,32 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col columnIdxs := shard.columnIdxs keyBuf := shard.keyBuf + var spg *statsPipeGroup for i := range timestamps { - // Construct key for the by (...) fields - keyBuf = keyBuf[:0] + // verify whether the key for 'by (...)' fields equals the previous key + sameValue := spg != nil for _, idx := range columnIdxs { - v := "" - if idx >= 0 { - v = columns[idx].Values[i] + if idx < 0 { + continue + } + values := columns[idx].Values + if i <= 0 || values[i-1] != values[i] { + sameValue = false + break } - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } - - spg := shard.getStatsPipeGroup(keyBuf) + if !sameValue { + // Construct new key for the 'by (...)' fields + keyBuf = keyBuf[:0] + for _, idx := range columnIdxs { + v := "" + if idx >= 0 { + v = columns[idx].Values[i] + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + spg = shard.getStatsPipeGroup(keyBuf) + } for _, sfp := range spg.sfps { shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) }