mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
wip
This commit is contained in:
parent
1b67995907
commit
53c3384bf7
1 changed files with 48 additions and 12 deletions
|
@ -346,15 +346,11 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
byFields := spp.sp.byFields
|
byFields := spp.sp.byFields
|
||||||
if len(byFields) == 0 {
|
if len(byFields) == 0 {
|
||||||
// Fast path - pass all the rows to a single group with empty key.
|
// Fast path - pass all the rows to a single group with empty key.
|
||||||
sfps := shard.getStatsFuncProcessors(nil)
|
for _, sfp := range shard.getStatsFuncProcessors(nil) {
|
||||||
for _, sfp := range sfps {
|
|
||||||
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
|
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - update per-row stats
|
|
||||||
|
|
||||||
if len(byFields) == 1 {
|
if len(byFields) == 1 {
|
||||||
// Special case for grouping by a single column.
|
// Special case for grouping by a single column.
|
||||||
idx := getBlockColumnIndex(columns, byFields[0])
|
idx := getBlockColumnIndex(columns, byFields[0])
|
||||||
|
@ -363,6 +359,16 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
}
|
}
|
||||||
values := columns[idx].Values
|
values := columns[idx].Values
|
||||||
|
|
||||||
|
if isConstValue(values) {
|
||||||
|
// Fast path for column with constant value.
|
||||||
|
shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0]))
|
||||||
|
for _, sfp := range shard.getStatsFuncProcessors(shard.keyBuf) {
|
||||||
|
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slower path for column with different values.
|
||||||
var sfps []statsFuncProcessor
|
var sfps []statsFuncProcessor
|
||||||
keyBuf := shard.keyBuf
|
keyBuf := shard.keyBuf
|
||||||
for i := range timestamps {
|
for i := range timestamps {
|
||||||
|
@ -378,11 +384,23 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// The slowest path - group by multiple columns.
|
|
||||||
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
|
// Pre-calculate column values for byFields in order to speed up building group key in the loop below.
|
||||||
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields)
|
shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields)
|
||||||
columnValues := shard.columnValues
|
columnValues := shard.columnValues
|
||||||
|
|
||||||
|
if areConstValues(columnValues) {
|
||||||
|
// Fast path for columns with constant values.
|
||||||
|
keyBuf := shard.keyBuf[:0]
|
||||||
|
for _, values := range columnValues {
|
||||||
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0]))
|
||||||
|
}
|
||||||
|
for _, sfp := range shard.getStatsFuncProcessors(keyBuf) {
|
||||||
|
shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// The slowest path - group by multiple columns.
|
||||||
var sfps []statsFuncProcessor
|
var sfps []statsFuncProcessor
|
||||||
keyBuf := shard.keyBuf
|
keyBuf := shard.keyBuf
|
||||||
for i := range timestamps {
|
for i := range timestamps {
|
||||||
|
@ -390,7 +408,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
sameValue := sfps != nil
|
sameValue := sfps != nil
|
||||||
for _, values := range columnValues {
|
for _, values := range columnValues {
|
||||||
if values == nil {
|
if values == nil {
|
||||||
continue
|
logger.Panicf("BUG: values cannot be nil here!")
|
||||||
}
|
}
|
||||||
if i <= 0 || values[i-1] != values[i] {
|
if i <= 0 || values[i-1] != values[i] {
|
||||||
sameValue = false
|
sameValue = false
|
||||||
|
@ -401,11 +419,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
// Construct new key for the 'by (...)' fields
|
// Construct new key for the 'by (...)' fields
|
||||||
keyBuf = keyBuf[:0]
|
keyBuf = keyBuf[:0]
|
||||||
for _, values := range columnValues {
|
for _, values := range columnValues {
|
||||||
v := ""
|
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
|
||||||
if values != nil {
|
|
||||||
v = values[i]
|
|
||||||
}
|
|
||||||
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
|
|
||||||
}
|
}
|
||||||
sfps = shard.getStatsFuncProcessors(keyBuf)
|
sfps = shard.getStatsFuncProcessors(keyBuf)
|
||||||
}
|
}
|
||||||
|
@ -416,6 +430,28 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
|
||||||
shard.keyBuf = keyBuf
|
shard.keyBuf = keyBuf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func areConstValues(valuess [][]string) bool {
|
||||||
|
for _, values := range valuess {
|
||||||
|
if !isConstValue(values) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isConstValue(values []string) bool {
|
||||||
|
if len(values) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
vFirst := values[0]
|
||||||
|
for _, v := range values[1:] {
|
||||||
|
if v != vFirst {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (spp *statsPipeProcessor) flush() error {
|
func (spp *statsPipeProcessor) flush() error {
|
||||||
if n := spp.stateSizeBudget.Load(); n <= 0 {
|
if n := spp.stateSizeBudget.Load(); n <= 0 {
|
||||||
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20))
|
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20))
|
||||||
|
|
Loading…
Reference in a new issue