diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 5106c1c1e..0f2e6b35c 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -346,15 +346,11 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col byFields := spp.sp.byFields if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. - sfps := shard.getStatsFuncProcessors(nil) - for _, sfp := range sfps { + for _, sfp := range shard.getStatsFuncProcessors(nil) { shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) } return } - - // Slow path - update per-row stats - if len(byFields) == 1 { // Special case for grouping by a single column. idx := getBlockColumnIndex(columns, byFields[0]) @@ -363,6 +359,16 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col } values := columns[idx].Values + if isConstValue(values) { + // Fast path for column with constant value. + shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) + for _, sfp := range shard.getStatsFuncProcessors(shard.keyBuf) { + shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) + } + return + } + + // Slower path for column with different values. var sfps []statsFuncProcessor keyBuf := shard.keyBuf for i := range timestamps { @@ -378,11 +384,23 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col return } - // The slowest path - group by multiple columns. // Pre-calculate column values for byFields in order to speed up building group key in the loop below. shard.columnValues = appendBlockColumnValues(shard.columnValues[:0], columns, spp.sp.byFields) columnValues := shard.columnValues + if areConstValues(columnValues) { + // Fast path for columns with constant values. + keyBuf := shard.keyBuf[:0] + for _, values := range columnValues { + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) + } + for _, sfp := range shard.getStatsFuncProcessors(keyBuf) { + shard.stateSizeBudget -= sfp.updateStatsForAllRows(timestamps, columns) + } + return + } + + // The slowest path - group by multiple columns. var sfps []statsFuncProcessor keyBuf := shard.keyBuf for i := range timestamps { @@ -390,7 +408,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col sameValue := sfps != nil for _, values := range columnValues { if values == nil { - continue + logger.Panicf("BUG: values cannot be nil here!") } if i <= 0 || values[i-1] != values[i] { sameValue = false @@ -401,11 +419,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col // Construct new key for the 'by (...)' fields keyBuf = keyBuf[:0] for _, values := range columnValues { - v := "" - if values != nil { - v = values[i] - } - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) } sfps = shard.getStatsFuncProcessors(keyBuf) } @@ -416,6 +430,28 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col shard.keyBuf = keyBuf } +func areConstValues(valuess [][]string) bool { + for _, values := range valuess { + if !isConstValue(values) { + return false + } + } + return true +} + +func isConstValue(values []string) bool { + if len(values) == 0 { + return false + } + vFirst := values[0] + for _, v := range values[1:] { + if v != vFirst { + return false + } + } + return true +} + func (spp *statsPipeProcessor) flush() error { if n := spp.stateSizeBudget.Load(); n <= 0 { return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", spp.sp.String(), spp.maxStateSize/(1<<20))