This commit is contained in:
Aliaksandr Valialkin 2024-04-27 04:43:38 +02:00
parent 91c7902555
commit d282056124
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -610,6 +610,10 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
// Fast path for a single column // Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
for _, v := range columns[idx].Values { for _, v := range columns[idx].Values {
if v == "" {
// Do not count empty values
continue
}
if _, ok := m[v]; !ok { if _, ok := m[v]; !ok {
vCopy := strings.Clone(v) vCopy := strings.Clone(v)
m[vCopy] = struct{}{} m[vCopy] = struct{}{}
@ -627,14 +631,22 @@ func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, co
keyBuf := sfup.keyBuf keyBuf := sfup.keyBuf
for i := range timestamps { for i := range timestamps {
allEmptyValues := true
keyBuf = keyBuf[:0] keyBuf = keyBuf[:0]
for _, idx := range columnIdxs { for _, idx := range columnIdxs {
v := "" v := ""
if idx >= 0 { if idx >= 0 {
v = columns[idx].Values[i] v = columns[idx].Values[i]
} }
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
} }
if allEmptyValues {
// Do not count empty values
continue
}
if _, ok := m[string(keyBuf)]; !ok { if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{} m[string(keyBuf)] = struct{}{}
} }
@ -650,6 +662,10 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column
// Fast path for a single column // Fast path for a single column
if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 {
v := columns[idx].Values[rowIdx] v := columns[idx].Values[rowIdx]
if v == "" {
// Do not count empty values
return
}
if _, ok := m[v]; !ok { if _, ok := m[v]; !ok {
vCopy := strings.Clone(v) vCopy := strings.Clone(v)
m[vCopy] = struct{}{} m[vCopy] = struct{}{}
@ -659,16 +675,24 @@ func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, column
} }
// Slow path for multiple columns. // Slow path for multiple columns.
allEmptyValues := true
keyBuf := sfup.keyBuf keyBuf := sfup.keyBuf
for _, f := range fields { for _, f := range fields {
v := "" v := ""
if idx := getBlockColumnIndex(columns, f); idx >= 0 { if idx := getBlockColumnIndex(columns, f); idx >= 0 {
v = columns[idx].Values[rowIdx] v = columns[idx].Values[rowIdx]
} }
if v != "" {
allEmptyValues = false
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
} }
sfup.keyBuf = keyBuf sfup.keyBuf = keyBuf
if allEmptyValues {
// Do not count empty values
return
}
if _, ok := m[string(keyBuf)]; !ok { if _, ok := m[string(keyBuf)]; !ok {
m[string(keyBuf)] = struct{}{} m[string(keyBuf)] = struct{}{}
} }
@ -747,6 +771,10 @@ func (hp *headPipe) String() string {
} }
func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (hp *headPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
if hp.n == 0 {
// Special case - notify the caller to stop writing data to the returned headPipeProcessor
cancel()
}
return &headPipeProcessor{ return &headPipeProcessor{
hp: hp, hp: hp,
cancel: cancel, cancel: cancel,