This commit is contained in:
Aliaksandr Valialkin 2024-05-10 16:41:57 +02:00
parent a4b2806330
commit 4ed5ea65c1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 57 additions and 46 deletions

View file

@ -166,9 +166,8 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
if len(byFields) == 0 { if len(byFields) == 0 {
// Fast path - pass all the rows to a single group with empty key. // Fast path - pass all the rows to a single group with empty key.
for _, sfp := range shard.getStatsProcessors(nil) { psg := shard.getPipeStatsGroup(nil)
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
}
return return
} }
if len(byFields) == 1 { if len(byFields) == 1 {
@ -179,9 +178,8 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
// Fast path for column with constant value. // Fast path for column with constant value.
v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset) v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset)
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v))
for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
}
return return
} }
@ -189,23 +187,20 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
if areConstValues(values) { if areConstValues(values) {
// Fast path for column with constant values. // Fast path for column with constant values.
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { psg := shard.getPipeStatsGroup(shard.keyBuf)
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
}
return return
} }
// Slower generic path for a column with different values. // Slower generic path for a column with different values.
var sfps []statsProcessor var psg *pipeStatsGroup
keyBuf := shard.keyBuf[:0] keyBuf := shard.keyBuf[:0]
for i := range br.timestamps { for i := range br.timestamps {
if i <= 0 || values[i-1] != values[i] { if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
sfps = shard.getStatsProcessors(keyBuf) psg = shard.getPipeStatsGroup(keyBuf)
}
for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(br, i)
} }
shard.stateSizeBudget -= psg.updateStatsForRow(br, i)
} }
shard.keyBuf = keyBuf shard.keyBuf = keyBuf
return return
@ -234,19 +229,18 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
for _, values := range columnValues { for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
} }
for _, sfp := range shard.getStatsProcessors(keyBuf) { psg := shard.getPipeStatsGroup(keyBuf)
shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) shard.stateSizeBudget -= psg.updateStatsForAllRows(br)
}
shard.keyBuf = keyBuf shard.keyBuf = keyBuf
return return
} }
// The slowest path - group by multiple columns with different values across rows. // The slowest path - group by multiple columns with different values across rows.
var sfps []statsProcessor var psg *pipeStatsGroup
keyBuf := shard.keyBuf[:0] keyBuf := shard.keyBuf[:0]
for i := range br.timestamps { for i := range br.timestamps {
// Verify whether the key for 'by (...)' fields equals the previous key // Verify whether the key for 'by (...)' fields equals the previous key
sameValue := sfps != nil sameValue := i > 0
for _, values := range columnValues { for _, values := range columnValues {
if i <= 0 || values[i-1] != values[i] { if i <= 0 || values[i-1] != values[i] {
sameValue = false sameValue = false
@ -259,37 +253,54 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) {
for _, values := range columnValues { for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
} }
sfps = shard.getStatsProcessors(keyBuf) psg = shard.getPipeStatsGroup(keyBuf)
}
for _, sfp := range sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(br, i)
} }
shard.stateSizeBudget -= psg.updateStatsForRow(br, i)
} }
shard.keyBuf = keyBuf shard.keyBuf = keyBuf
} }
func (shard *pipeStatsProcessorShard) getStatsProcessors(key []byte) []statsProcessor { func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup {
spg := shard.m[string(key)] psg := shard.m[string(key)]
if spg == nil { if psg != nil {
return psg
}
sfps := make([]statsProcessor, len(shard.ps.funcs)) sfps := make([]statsProcessor, len(shard.ps.funcs))
for i, f := range shard.ps.funcs { for i, f := range shard.ps.funcs {
sfp, stateSize := f.newStatsProcessor() sfp, stateSize := f.newStatsProcessor()
sfps[i] = sfp sfps[i] = sfp
shard.stateSizeBudget -= stateSize shard.stateSizeBudget -= stateSize
} }
spg = &pipeStatsGroup{ psg = &pipeStatsGroup{
sfps: sfps, sfps: sfps,
} }
shard.m[string(key)] = spg shard.m[string(key)] = psg
shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps)))
}
return spg.sfps return psg
} }
type pipeStatsGroup struct { type pipeStatsGroup struct {
sfps []statsProcessor sfps []statsProcessor
} }
func (psg *pipeStatsGroup) updateStatsForAllRows(br *blockResult) int {
n := 0
for _, sfp := range psg.sfps {
n += sfp.updateStatsForAllRows(br)
}
return n
}
func (psg *pipeStatsGroup) updateStatsForRow(br *blockResult, rowIdx int) int {
n := 0
for _, sfp := range psg.sfps {
n += sfp.updateStatsForRow(br, rowIdx)
}
return n
}
func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 { if len(br.timestamps) == 0 {
return return
@ -325,7 +336,7 @@ func (psp *pipeStatsProcessor) flush() error {
shards = shards[1:] shards = shards[1:]
for i := range shards { for i := range shards {
shard := &shards[i] shard := &shards[i]
for key, spg := range shard.m { for key, psg := range shard.m {
// shard.m may be quite big, so this loop can take a lot of time and CPU. // shard.m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time. // Stop processing data as soon as stopCh is closed without wasting additional CPU time.
select { select {
@ -336,10 +347,10 @@ func (psp *pipeStatsProcessor) flush() error {
spgBase := m[key] spgBase := m[key]
if spgBase == nil { if spgBase == nil {
m[key] = spg m[key] = psg
} else { } else {
for i, sfp := range spgBase.sfps { for i, sfp := range spgBase.sfps {
sfp.mergeState(spg.sfps[i]) sfp.mergeState(psg.sfps[i])
} }
} }
} }
@ -349,7 +360,7 @@ func (psp *pipeStatsProcessor) flush() error {
byFields := psp.ps.byFields byFields := psp.ps.byFields
if len(byFields) == 0 && len(m) == 0 { if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows. // Special case - zero matching rows.
_ = shards[0].getStatsProcessors(nil) _ = shards[0].getPipeStatsGroup(nil)
m = shards[0].m m = shards[0].m
} }
@ -368,7 +379,7 @@ func (psp *pipeStatsProcessor) flush() error {
var values []string var values []string
valuesLen := 0 valuesLen := 0
for key, spg := range m { for key, psg := range m {
// m may be quite big, so this loop can take a lot of time and CPU. // m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting additional CPU time. // Stop processing data as soon as stopCh is closed without wasting additional CPU time.
select { select {
@ -393,7 +404,7 @@ func (psp *pipeStatsProcessor) flush() error {
} }
// calculate values for stats functions // calculate values for stats functions
for _, sfp := range spg.sfps { for _, sfp := range psg.sfps {
value := sfp.finalizeStats() value := sfp.finalizeStats()
values = append(values, value) values = append(values, value)
} }