diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 421fb96c7..1a4db2794 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -610,6 +610,10 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { for _, v := range columns[idx].Values { + if v == "" { + // Do not count empty values + continue + } if _, ok := m[v]; !ok { vCopy := strings.Clone(v) m[vCopy] = struct{}{} @@ -627,14 +631,22 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co keyBuf := sfup.keyBuf for i := range timestamps { + allEmptyValues := true keyBuf = keyBuf[:0] for _, idx := range columnIdxs { v := "" if idx >= 0 { v = columns[idx].Values[i] } + if v != "" { + allEmptyValues = false + } keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } + if allEmptyValues { + // Do not count empty values + continue + } if _, ok := m[string(keyBuf)]; !ok { m[string(keyBuf)] = struct{}{} } @@ -650,6 +662,10 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column // Fast path for a single column if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { v := columns[idx].Values[rowIdx] + if v == "" { + // Do not count empty values + return + } if _, ok := m[v]; !ok { vCopy := strings.Clone(v) m[vCopy] = struct{}{} @@ -659,16 +675,24 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column } // Slow path for multiple columns. + allEmptyValues := true keyBuf := sfup.keyBuf for _, f := range fields { v := "" if idx := getBlockColumnIndex(columns, f); idx >= 0 { v = columns[idx].Values[rowIdx] } + if v != "" { + allEmptyValues = false + } keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } sfup.keyBuf = keyBuf + if allEmptyValues { + // Do not count empty values + return + } if _, ok := m[string(keyBuf)]; !ok { m[string(keyBuf)] = struct{}{} } @@ -747,6 +771,10 @@ func (hp *headPipe) String() string { } func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + if hp.n == 0 { + // Special case - notify the caller to stop writing data to the returned headPipeProcessor + cancel() + } return &headPipeProcessor{ hp: hp, cancel: cancel,