From 6f798c628fd98679d92b5ff9cb5f6857923f321a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 May 2024 13:23:51 +0200 Subject: [PATCH] wip --- lib/logstorage/stats_max.go | 44 ++++++++++++++++++++++++++++++----- lib/logstorage/stats_min.go | 46 +++++++++++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index b2c35aa94..1617db3ce 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -4,6 +4,8 @@ import ( "slices" "strings" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) type statsMax struct { @@ -39,17 +41,13 @@ func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - for _, v := range c.getValues(br) { - smp.updateState(v) - } + smp.updateStateForColumn(br, c) } } else { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - for _, v := range c.getValues(br) { - smp.updateState(v) - } + smp.updateStateForColumn(br, c) } } @@ -84,6 +82,40 @@ func (smp *statsMaxProcessor) mergeState(sfp statsProcessor) { } } +func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { + if c.isTime { + // Special case for time column + timestamps := br.timestamps + if len(timestamps) == 0 { + return + } + maxTimestamp := timestamps[len(timestamps)-1] + for _, timestamp := range timestamps[:len(timestamps)-1] { + if timestamp > maxTimestamp { + maxTimestamp = timestamp + } + } + + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339Nano(bb.B[:0], maxTimestamp) + v := bytesutil.ToUnsafeString(bb.B) + smp.updateState(v) + bbPool.Put(bb) + + return + } + if c.isConst { + // Special case for const column + v := c.encodedValues[0] + smp.updateState(v) + return + } + + for _, v := range c.getValues(br) { + smp.updateState(v) + } +} + func (smp *statsMaxProcessor) updateState(v string) { if smp.hasMax && !lessString(smp.max, v) { return diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 8e451a0e1..9634243e5 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -4,6 +4,8 @@ import ( "slices" "strings" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) type statsMin struct { @@ -29,7 +31,7 @@ func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { type statsMinProcessor struct { sm *statsMin - min string + min string hasMin bool } @@ -39,17 +41,13 @@ func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { if smp.sm.containsStar { // Find the minimum value across all the columns for _, c := range br.getColumns() { - for _, v := range c.getValues(br) { - smp.updateState(v) - } + smp.updateStateForColumn(br, c) } } else { // Find the minimum value across the requested columns for _, field := range smp.sm.fields { c := br.getColumnByName(field) - for _, v := range c.getValues(br) { - smp.updateState(v) - } + smp.updateStateForColumn(br, c) } } @@ -84,6 +82,40 @@ func (smp *statsMinProcessor) mergeState(sfp statsProcessor) { } } +func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResultColumn) { + if c.isTime { + // Special case for time column + timestamps := br.timestamps + if len(timestamps) == 0 { + return + } + minTimestamp := timestamps[0] + for _, timestamp := range timestamps[1:] { + if timestamp < minTimestamp { + minTimestamp = timestamp + } + } + + bb := bbPool.Get() + bb.B = marshalTimestampRFC3339Nano(bb.B[:0], minTimestamp) + v := bytesutil.ToUnsafeString(bb.B) + smp.updateState(v) + bbPool.Put(bb) + + return + } + if c.isConst { + // Special case for const column + v := c.encodedValues[0] + smp.updateState(v) + return + } + + for _, v := range c.getValues(br) { + smp.updateState(v) + } +} + func (smp *statsMinProcessor) updateState(v string) { if smp.hasMin && !lessString(v, smp.min) { return