This commit is contained in:
Aliaksandr Valialkin 2024-04-28 22:42:50 +02:00
parent 59b7bef051
commit a941bdcdbd
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -296,19 +296,11 @@ type statsPipeProcessorShardNopad struct {
columnIdxs []int columnIdxs []int
keyBuf []byte keyBuf []byte
keyBufPrev []byte
spgPrev *statsPipeGroup
stateSizeBudget int stateSizeBudget int
} }
func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup {
if shard.spgPrev != nil && string(shard.keyBufPrev) == string(key) {
// Fast path - return the spg for the same key.
return shard.spgPrev
}
// Slow path - locate spg by key.
spg := shard.m[string(key)] spg := shard.m[string(key)]
if spg == nil { if spg == nil {
sfps := make([]statsFuncProcessor, len(shard.funcs)) sfps := make([]statsFuncProcessor, len(shard.funcs))
@ -323,9 +315,6 @@ func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGr
shard.m[string(key)] = spg shard.m[string(key)] = spg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
} }
shard.keyBufPrev = append(shard.keyBufPrev[:0], key...)
shard.spgPrev = spg
return spg return spg
} }
@ -366,18 +355,32 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
columnIdxs := shard.columnIdxs columnIdxs := shard.columnIdxs
keyBuf := shard.keyBuf keyBuf := shard.keyBuf
var spg *statsPipeGroup
for i := range timestamps { for i := range timestamps {
// Construct key for the by (...) fields // verify whether the key for 'by (...)' fields equals the previous key
keyBuf = keyBuf[:0] sameValue := spg != nil
for _, idx := range columnIdxs { for _, idx := range columnIdxs {
v := "" if idx < 0 {
if idx >= 0 { continue
v = columns[idx].Values[i] }
values := columns[idx].Values
if i <= 0 || values[i-1] != values[i] {
sameValue = false
break
} }
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
} }
if !sameValue {
spg := shard.getStatsPipeGroup(keyBuf) // Construct new key for the 'by (...)' fields
keyBuf = keyBuf[:0]
for _, idx := range columnIdxs {
v := ""
if idx >= 0 {
v = columns[idx].Values[i]
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
spg = shard.getStatsPipeGroup(keyBuf)
}
for _, sfp := range spg.sfps { for _, sfp := range spg.sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i) shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
} }