diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index bcc34c47a..cf3115ed9 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -166,9 +166,8 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { if len(byFields) == 0 { // Fast path - pass all the rows to a single group with empty key. - for _, sfp := range shard.getStatsProcessors(nil) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) - } + psg := shard.getPipeStatsGroup(nil) + shard.stateSizeBudget -= psg.updateStatsForAllRows(br) return } if len(byFields) == 1 { @@ -179,9 +178,8 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { // Fast path for column with constant value. v := br.getBucketedValue(c.encodedValues[0], bf.bucketSize, bf.bucketOffset) shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(v)) - for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) - } + psg := shard.getPipeStatsGroup(shard.keyBuf) + shard.stateSizeBudget -= psg.updateStatsForAllRows(br) return } @@ -189,23 +187,20 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { if areConstValues(values) { // Fast path for column with constant values. shard.keyBuf = encoding.MarshalBytes(shard.keyBuf[:0], bytesutil.ToUnsafeBytes(values[0])) - for _, sfp := range shard.getStatsProcessors(shard.keyBuf) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) - } + psg := shard.getPipeStatsGroup(shard.keyBuf) + shard.stateSizeBudget -= psg.updateStatsForAllRows(br) return } // Slower generic path for a column with different values. - var sfps []statsProcessor + var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] for i := range br.timestamps { if i <= 0 || values[i-1] != values[i] { keyBuf = encoding.MarshalBytes(keyBuf[:0], bytesutil.ToUnsafeBytes(values[i])) - sfps = shard.getStatsProcessors(keyBuf) - } - for _, sfp := range sfps { - shard.stateSizeBudget -= sfp.updateStatsForRow(br, i) + psg = shard.getPipeStatsGroup(keyBuf) } + shard.stateSizeBudget -= psg.updateStatsForRow(br, i) } shard.keyBuf = keyBuf return @@ -234,19 +229,18 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[0])) } - for _, sfp := range shard.getStatsProcessors(keyBuf) { - shard.stateSizeBudget -= sfp.updateStatsForAllRows(br) - } + psg := shard.getPipeStatsGroup(keyBuf) + shard.stateSizeBudget -= psg.updateStatsForAllRows(br) shard.keyBuf = keyBuf return } // The slowest path - group by multiple columns with different values across rows. - var sfps []statsProcessor + var psg *pipeStatsGroup keyBuf := shard.keyBuf[:0] for i := range br.timestamps { // Verify whether the key for 'by (...)' fields equals the previous key - sameValue := sfps != nil + sameValue := i > 0 for _, values := range columnValues { if i <= 0 || values[i-1] != values[i] { sameValue = false @@ -259,37 +253,54 @@ func (shard *pipeStatsProcessorShard) writeBlock(br *blockResult) { for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) } - sfps = shard.getStatsProcessors(keyBuf) - } - for _, sfp := range sfps { - shard.stateSizeBudget -= sfp.updateStatsForRow(br, i) + psg = shard.getPipeStatsGroup(keyBuf) } + shard.stateSizeBudget -= psg.updateStatsForRow(br, i) } shard.keyBuf = keyBuf } -func (shard *pipeStatsProcessorShard) getStatsProcessors(key []byte) []statsProcessor { - spg := shard.m[string(key)] - if spg == nil { - sfps := make([]statsProcessor, len(shard.ps.funcs)) - for i, f := range shard.ps.funcs { - sfp, stateSize := f.newStatsProcessor() - sfps[i] = sfp - shard.stateSizeBudget -= stateSize - } - spg = &pipeStatsGroup{ - sfps: sfps, - } - shard.m[string(key)] = spg - shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(spg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) +func (shard *pipeStatsProcessorShard) getPipeStatsGroup(key []byte) *pipeStatsGroup { + psg := shard.m[string(key)] + if psg != nil { + return psg } - return spg.sfps + + sfps := make([]statsProcessor, len(shard.ps.funcs)) + for i, f := range shard.ps.funcs { + sfp, stateSize := f.newStatsProcessor() + sfps[i] = sfp + shard.stateSizeBudget -= stateSize + } + psg = &pipeStatsGroup{ + sfps: sfps, + } + shard.m[string(key)] = psg + shard.stateSizeBudget -= len(key) + int(unsafe.Sizeof("")+unsafe.Sizeof(psg)+unsafe.Sizeof(sfps[0])*uintptr(len(sfps))) + + return psg } type pipeStatsGroup struct { sfps []statsProcessor } +func (psg *pipeStatsGroup) updateStatsForAllRows(br *blockResult) int { + n := 0 + for _, sfp := range psg.sfps { + n += sfp.updateStatsForAllRows(br) + } + return n +} + +func (psg *pipeStatsGroup) updateStatsForRow(br *blockResult, rowIdx int) int { + n := 0 + for _, sfp := range psg.sfps { + n += sfp.updateStatsForRow(br, rowIdx) + } + return n +} + func (psp *pipeStatsProcessor) writeBlock(workerID uint, br *blockResult) { if len(br.timestamps) == 0 { return @@ -325,7 +336,7 @@ func (psp *pipeStatsProcessor) flush() error { shards = shards[1:] for i := range shards { shard := &shards[i] - for key, spg := range shard.m { + for key, psg := range shard.m { // shard.m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { @@ -336,10 +347,10 @@ func (psp *pipeStatsProcessor) flush() error { spgBase := m[key] if spgBase == nil { - m[key] = spg + m[key] = psg } else { for i, sfp := range spgBase.sfps { - sfp.mergeState(spg.sfps[i]) + sfp.mergeState(psg.sfps[i]) } } } @@ -349,7 +360,7 @@ func (psp *pipeStatsProcessor) flush() error { byFields := psp.ps.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. - _ = shards[0].getStatsProcessors(nil) + _ = shards[0].getPipeStatsGroup(nil) m = shards[0].m } @@ -368,7 +379,7 @@ func (psp *pipeStatsProcessor) flush() error { var values []string valuesLen := 0 - for key, spg := range m { + for key, psg := range m { // m may be quite big, so this loop can take a lot of time and CPU. // Stop processing data as soon as stopCh is closed without wasting additional CPU time. select { @@ -393,7 +404,7 @@ func (psp *pipeStatsProcessor) flush() error { } // calculate values for stats functions - for _, sfp := range spg.sfps { + for _, sfp := range psg.sfps { value := sfp.finalizeStats() values = append(values, value) } diff --git a/lib/logstorage/stats_count_uniq.go b/lib/logstorage/stats_count_uniq.go index 014139595..6b0885961 100644 --- a/lib/logstorage/stats_count_uniq.go +++ b/lib/logstorage/stats_count_uniq.go @@ -13,7 +13,7 @@ import ( type statsCountUniq struct { fields []string containsStar bool - limit uint64 + limit uint64 } func (su *statsCountUniq) String() string { diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index f240a77e5..e13991945 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -13,7 +13,7 @@ import ( type statsUniqValues struct { fields []string containsStar bool - limit uint64 + limit uint64 } func (su *statsUniqValues) String() string {