This commit is contained in:
Aliaksandr Valialkin 2024-04-28 23:19:40 +02:00
parent 0850e13eb3
commit 24f07dfdc3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -298,8 +298,8 @@ type statsPipeProcessorShardNopad struct {
m map[string]*statsPipeGroup
funcs []statsFunc
columnIdxs []int
keyBuf []byte
columnValues [][]string
keyBuf []byte
stateSizeBudget int
}
@ -343,8 +343,9 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
shard.stateSizeBudget += stateSizeBudgetChunk
}
if len(spp.sp.byFields) == 0 {
// Fast path - pass all the rows to a single group
byFields := spp.sp.byFields
if len(byFields) == 0 || len(byFields) == 1 && getBlockColumnIndex(columns, byFields[0]) < 0 {
// Fast path - pass all the rows to a single group with empty key.
spg := shard.getStatsPipeGroup(nil)
for _, sfp := range spg.sfps {
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
@ -354,20 +355,43 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
// Slow path - update per-row stats
// Pre-calculate column indexes for byFields in order to speed up building group key in the loop below.
shard.columnIdxs = appendBlockColumnIndexes(shard.columnIdxs[:0], columns, spp.sp.byFields)
columnIdxs := shard.columnIdxs
if len(byFields) == 1 {
// Special case for grouping by a single column.
idx := getBlockColumnIndex(columns, byFields[0])
if idx < 0 {
logger.Panicf("BUG: columnIdx must be positive")
}
values := columns[idx].Values
var spg *statsPipeGroup
keyBuf := shard.keyBuf
for i := range timestamps {
if i <= 0 || values[i-1] != values[i] {
keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i]))
spg = shard.getStatsPipeGroup(keyBuf)
}
for _, sfp := range spg.sfps {
shard.stateSizeBudget -= sfp.updateStatsForRow(timestamps, columns, i)
}
}
shard.keyBuf = keyBuf
return
}
// The slowest path - group by multiple columns.
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields)
columnValues := shard.columnValues
keyBuf := shard.keyBuf
var spg *statsPipeGroup
keyBuf := shard.keyBuf
for i := range timestamps {
// verify whether the key for 'by (...)' fields equals the previous key
sameValue := spg != nil
for _, idx := range columnIdxs {
if idx < 0 {
for _, values := range columnValues {
if values == nil {
continue
}
values := columns[idx].Values
if i <= 0 || values[i-1] != values[i] {
sameValue = false
break
@ -376,10 +400,10 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
if !sameValue {
// Construct new key for the 'by (...)' fields
keyBuf = keyBuf[:0]
for _, idx := range columnIdxs {
for _, values := range columnValues {
v := ""
if idx >= 0 {
v = columns[idx].Values[i]
if values != nil {
v = values[i]
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
@ -667,8 +691,8 @@ type statsFuncUniqProcessor struct {
m map[string]struct{}
columnIdxs []int
keyBuf []byte
columnValues [][]string
keyBuf []byte
}
func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
@ -696,18 +720,18 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
// Slow path for multiple columns.
// Pre-calculate column indexes for byFields in order to speed up building group key in the loop below.
sfup.columnIdxs = appendBlockColumnIndexes(sfup.columnIdxs[:0], columns, fields)
columnIdxs := sfup.columnIdxs
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields)
columnValues := sfup.columnValues
keyBuf := sfup.keyBuf
for i := range timestamps {
allEmptyValues := true
keyBuf = keyBuf[:0]
for _, idx := range columnIdxs {
for _, values := range columnValues {
v := ""
if idx >= 0 {
v = columns[idx].Values[i]
if values != nil {
v = values[i]
}
if v != "" {
allEmptyValues = false
@ -1042,10 +1066,14 @@ func getFieldsIgnoreStar(fields []string) []string {
return result
}
func appendBlockColumnIndexes(dst []int, columns []BlockColumn, fields []string) []int {
func appendBlockColumnValues(dst [][]string, columns []BlockColumn, fields []string) [][]string {
for _, f := range fields {
idx := getBlockColumnIndex(columns, f)
dst = append(dst, idx)
var values []string
if idx >= 0 {
values = columns[idx].Values
}
dst = append(dst, values)
}
return dst
}