diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 74ecd56be..39eb09ef1 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -2,9 +2,7 @@ package logstorage import ( "fmt" - "math" "slices" - "strconv" "strings" "sync/atomic" "unsafe" @@ -593,19 +591,19 @@ func parseStatsPipe(lex *lexer) (*statsPipe, error) { func parseStatsFunc(lex *lexer) (statsFunc, error) { switch { case lex.isKeyword("count"): - sfc, err := parseStatsFuncCount(lex) + sfc, err := parseStatsCount(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'count' func: %w", err) } return sfc, nil case lex.isKeyword("uniq"): - sfu, err := parseStatsFuncUniq(lex) + sfu, err := parseStatsUniq(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err) } return sfu, nil case lex.isKeyword("sum"): - sfs, err := parseStatsFuncSum(lex) + sfs, err := parseStatsSum(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'sum' func: %w", err) } @@ -615,476 +613,6 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { } } -type statsFuncCount struct { - fields []string - containsStar bool - - resultName string -} - -func (sfc *statsFuncCount) String() string { - return "count(" + fieldNamesString(sfc.fields) + ") as " + quoteTokenIfNeeded(sfc.resultName) -} - -func (sfc *statsFuncCount) neededFields() []string { - return getFieldsIgnoreStar(sfc.fields) -} - -func (sfc *statsFuncCount) newStatsFuncProcessor() (statsFuncProcessor, int) { - sfcp := &statsFuncCountProcessor{ - sfc: sfc, - } - return sfcp, int(unsafe.Sizeof(*sfcp)) -} - -type statsFuncCountProcessor struct { - sfc *statsFuncCount - - rowsCount uint64 -} - -func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { - fields := sfcp.sfc.fields - if len(fields) == 0 || sfcp.sfc.containsStar { - // Fast path - count all the columns. - sfcp.rowsCount += uint64(len(timestamps)) - return 0 - } - - // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). - bm := getFilterBitmap(len(timestamps)) - defer putFilterBitmap(bm) - - bm.setBits() - for _, f := range fields { - if idx := getBlockColumnIndex(columns, f); idx >= 0 { - values := columns[idx].Values - bm.forEachSetBit(func(i int) bool { - return values[i] == "" - }) - } - } - - emptyValues := 0 - bm.forEachSetBit(func(i int) bool { - emptyValues++ - return true - }) - - sfcp.rowsCount += uint64(len(timestamps) - emptyValues) - return 0 -} - -func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { - fields := sfcp.sfc.fields - if len(fields) == 0 || sfcp.sfc.containsStar { - // Fast path - count the given column - sfcp.rowsCount++ - return 0 - } - - // Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty - for _, f := range fields { - if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" { - sfcp.rowsCount++ - return 0 - } - } - return 0 -} - -func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) { - src := sfp.(*statsFuncCountProcessor) - sfcp.rowsCount += src.rowsCount -} - -func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) { - value := strconv.FormatUint(sfcp.rowsCount, 10) - return sfcp.sfc.resultName, value -} - -func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'count' args: %w", err) - } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } - sfc := &statsFuncCount{ - fields: fields, - containsStar: slices.Contains(fields, "*"), - resultName: resultName, - } - return sfc, nil -} - -type statsFuncSum struct { - fields []string - containsStar bool - resultName string -} - -func (sfs *statsFuncSum) String() string { - return "sum(" + fieldNamesString(sfs.fields) + ") as " + quoteTokenIfNeeded(sfs.resultName) -} - -func (sfs *statsFuncSum) neededFields() []string { - return sfs.fields -} - -func (sfs *statsFuncSum) newStatsFuncProcessor() (statsFuncProcessor, int) { - sfsp := &statsFuncSumProcessor{ - sfs: sfs, - } - return sfsp, int(unsafe.Sizeof(*sfsp)) -} - -type statsFuncSumProcessor struct { - sfs *statsFuncSum - - sum float64 -} - -func (sfsp *statsFuncSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { - if sfsp.sfs.containsStar { - // Sum all the columns - for _, c := range columns { - sfsp.sum += sumValues(c.Values) - } - return 0 - } - - // Sum the requested columns - for _, field := range sfsp.sfs.fields { - if idx := getBlockColumnIndex(columns, field); idx >= 0 { - sfsp.sum += sumValues(columns[idx].Values) - } - } - return 0 -} - -func sumValues(values []string) float64 { - sum := float64(0) - f := float64(0) - for i, v := range values { - if i == 0 || values[i-1] != v { - f, _ = tryParseFloat64(v) - if math.IsNaN(f) { - // Ignore NaN values, since this is the expected behaviour by most users. - f = 0 - } - } - sum += f - } - return sum -} - -func (sfsp *statsFuncSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { - if sfsp.sfs.containsStar { - // Sum all the fields for the given row - for _, c := range columns { - v := c.Values[rowIdx] - f, _ := tryParseFloat64(v) - if !math.IsNaN(f) { - sfsp.sum += f - } - } - return 0 - } - - // Sum only the given fields for the given row - for _, field := range sfsp.sfs.fields { - if idx := getBlockColumnIndex(columns, field); idx >= 0 { - v := columns[idx].Values[rowIdx] - f, _ := tryParseFloat64(v) - if !math.IsNaN(f) { - sfsp.sum += f - } - } - } - return 0 -} - -func (sfsp *statsFuncSumProcessor) mergeState(sfp statsFuncProcessor) { - src := sfp.(*statsFuncSumProcessor) - sfsp.sum += src.sum -} - -func (sfsp *statsFuncSumProcessor) finalizeStats() (string, string) { - value := strconv.FormatFloat(sfsp.sum, 'g', -1, 64) - return sfsp.sfs.resultName, value -} - -func parseStatsFuncSum(lex *lexer) (*statsFuncSum, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'sum' args: %w", err) - } - if len(fields) == 0 { - return nil, fmt.Errorf("'sum' must contain at least one arg") - } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } - sfs := &statsFuncSum{ - fields: fields, - containsStar: slices.Contains(fields, "*"), - resultName: resultName, - } - return sfs, nil -} - -type statsFuncUniq struct { - fields []string - containsStar bool - resultName string -} - -func (sfu *statsFuncUniq) String() string { - return "uniq(" + fieldNamesString(sfu.fields) + ") as " + quoteTokenIfNeeded(sfu.resultName) -} - -func (sfu *statsFuncUniq) neededFields() []string { - return sfu.fields -} - -func (sfu *statsFuncUniq) newStatsFuncProcessor() (statsFuncProcessor, int) { - sfup := &statsFuncUniqProcessor{ - sfu: sfu, - - m: make(map[string]struct{}), - } - return sfup, int(unsafe.Sizeof(*sfup)) -} - -type statsFuncUniqProcessor struct { - sfu *statsFuncUniq - - m map[string]struct{} - - columnValues [][]string - keyBuf []byte -} - -func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { - fields := sfup.sfu.fields - m := sfup.m - - stateSizeIncrease := 0 - if len(fields) == 0 || sfup.sfu.containsStar { - // Count unique rows - keyBuf := sfup.keyBuf - for i := range timestamps { - seenKey := true - for _, c := range columns { - values := c.Values - if i == 0 || values[i-1] != values[i] { - seenKey = false - break - } - } - if seenKey { - continue - } - - allEmptyValues := true - keyBuf = keyBuf[:0] - for _, c := range columns { - v := c.Values[i] - if v != "" { - allEmptyValues = false - } - // Put column name into key, since every block can contain different set of columns for '*' selector. - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) - } - if allEmptyValues { - // Do not count empty values - continue - } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } - } - sfup.keyBuf = keyBuf - return stateSizeIncrease - } - if len(fields) == 1 { - // Fast path for a single column - if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { - values := columns[idx].Values - for i, v := range values { - if v == "" { - // Do not count empty values - continue - } - if i > 0 && values[i-1] == v { - continue - } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } - } - } - return stateSizeIncrease - } - - // Slow path for multiple columns. - - // Pre-calculate column values for byFields in order to speed up building group key in the loop below. - sfup.columnValues = appendBlockColumnValues(sfup.columnValues[:0], columns, fields, len(timestamps)) - columnValues := sfup.columnValues - - keyBuf := sfup.keyBuf - for i := range timestamps { - seenKey := true - for _, values := range columnValues { - if i == 0 || values[i-1] != values[i] { - seenKey = false - } - } - if seenKey { - continue - } - - allEmptyValues := true - keyBuf = keyBuf[:0] - for _, values := range columnValues { - v := values[i] - if v != "" { - allEmptyValues = false - } - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) - } - if allEmptyValues { - // Do not count empty values - continue - } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } - } - sfup.keyBuf = keyBuf - return stateSizeIncrease -} - -func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int { - fields := sfup.sfu.fields - m := sfup.m - - stateSizeIncrease := 0 - if len(fields) == 0 || sfup.sfu.containsStar { - // Count unique rows - allEmptyValues := true - keyBuf := sfup.keyBuf[:0] - for _, c := range columns { - v := c.Values[rowIdx] - if v != "" { - allEmptyValues = false - } - // Put column name into key, since every block can contain different set of columns for '*' selector. - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) - } - sfup.keyBuf = keyBuf - - if allEmptyValues { - // Do not count empty values - return stateSizeIncrease - } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } - return stateSizeIncrease - } - if len(fields) == 1 { - // Fast path for a single column - if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { - v := columns[idx].Values[rowIdx] - if v == "" { - // Do not count empty values - return stateSizeIncrease - } - if _, ok := m[v]; !ok { - vCopy := strings.Clone(v) - m[vCopy] = struct{}{} - stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) - } - } - return stateSizeIncrease - } - - // Slow path for multiple columns. - allEmptyValues := true - keyBuf := sfup.keyBuf[:0] - for _, f := range fields { - v := "" - if idx := getBlockColumnIndex(columns, f); idx >= 0 { - v = columns[idx].Values[rowIdx] - } - if v != "" { - allEmptyValues = false - } - keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) - } - sfup.keyBuf = keyBuf - - if allEmptyValues { - // Do not count empty values - return stateSizeIncrease - } - if _, ok := m[string(keyBuf)]; !ok { - m[string(keyBuf)] = struct{}{} - stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) - } - return stateSizeIncrease -} - -func (sfup *statsFuncUniqProcessor) mergeState(sfp statsFuncProcessor) { - src := sfp.(*statsFuncUniqProcessor) - m := sfup.m - for k := range src.m { - m[k] = struct{}{} - } -} - -func (sfup *statsFuncUniqProcessor) finalizeStats() (string, string) { - n := uint64(len(sfup.m)) - value := strconv.FormatUint(n, 10) - return sfup.sfu.resultName, value -} - -func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) { - lex.nextToken() - fields, err := parseFieldNamesInParens(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err) - } - if len(fields) == 0 { - return nil, fmt.Errorf("'uniq' must contain at least a single arg") - } - resultName, err := parseResultName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name: %w", err) - } - sfu := &statsFuncUniq{ - fields: fields, - containsStar: slices.Contains(fields, "*"), - resultName: resultName, - } - return sfu, nil -} - func parseResultName(lex *lexer) (string, error) { if lex.isKeyword("as") { if !lex.mustNextToken() { diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go new file mode 100644 index 000000000..22406b24f --- /dev/null +++ b/lib/logstorage/stats_count.go @@ -0,0 +1,114 @@ +package logstorage + +import ( + "fmt" + "slices" + "strconv" + "unsafe" +) + +type statsCount struct { + fields []string + containsStar bool + + resultName string +} + +func (sc *statsCount) String() string { + return "count(" + fieldNamesString(sc.fields) + ") as " + quoteTokenIfNeeded(sc.resultName) +} + +func (sc *statsCount) neededFields() []string { + return getFieldsIgnoreStar(sc.fields) +} + +func (sc *statsCount) newStatsFuncProcessor() (statsFuncProcessor, int) { + scp := &statsCountProcessor{ + sc: sc, + } + return scp, int(unsafe.Sizeof(*scp)) +} + +type statsCountProcessor struct { + sc *statsCount + + rowsCount uint64 +} + +func (scp *statsCountProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { + fields := scp.sc.fields + if len(fields) == 0 || scp.sc.containsStar { + // Fast path - count all the columns. + scp.rowsCount += uint64(len(timestamps)) + return 0 + } + + // Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count(). + bm := getFilterBitmap(len(timestamps)) + defer putFilterBitmap(bm) + + bm.setBits() + for _, f := range fields { + if idx := getBlockColumnIndex(columns, f); idx >= 0 { + values := columns[idx].Values + bm.forEachSetBit(func(i int) bool { + return values[i] == "" + }) + } + } + + emptyValues := 0 + bm.forEachSetBit(func(i int) bool { + emptyValues++ + return true + }) + + scp.rowsCount += uint64(len(timestamps) - emptyValues) + return 0 +} + +func (scp *statsCountProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { + fields := scp.sc.fields + if len(fields) == 0 || scp.sc.containsStar { + // Fast path - count the given column + scp.rowsCount++ + return 0 + } + + // Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty + for _, f := range fields { + if idx := getBlockColumnIndex(columns, f); idx >= 0 && columns[idx].Values[rowIdx] != "" { + scp.rowsCount++ + return 0 + } + } + return 0 +} + +func (scp *statsCountProcessor) mergeState(sfp statsFuncProcessor) { + src := sfp.(*statsCountProcessor) + scp.rowsCount += src.rowsCount +} + +func (scp *statsCountProcessor) finalizeStats() (string, string) { + value := strconv.FormatUint(scp.rowsCount, 10) + return scp.sc.resultName, value +} + +func parseStatsCount(lex *lexer) (*statsCount, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'count' args: %w", err) + } + resultName, err := parseResultName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name: %w", err) + } + sc := &statsCount{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + resultName: resultName, + } + return sc, nil +} diff --git a/lib/logstorage/stats_sum.go b/lib/logstorage/stats_sum.go new file mode 100644 index 000000000..fd6a73c82 --- /dev/null +++ b/lib/logstorage/stats_sum.go @@ -0,0 +1,127 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strconv" + "unsafe" +) + +type statsSum struct { + fields []string + containsStar bool + resultName string +} + +func (ss *statsSum) String() string { + return "sum(" + fieldNamesString(ss.fields) + ") as " + quoteTokenIfNeeded(ss.resultName) +} + +func (ss *statsSum) neededFields() []string { + return ss.fields +} + +func (ss *statsSum) newStatsFuncProcessor() (statsFuncProcessor, int) { + ssp := &statsSumProcessor{ + ss: ss, + } + return ssp, int(unsafe.Sizeof(*ssp)) +} + +type statsSumProcessor struct { + ss *statsSum + + sum float64 +} + +func (ssp *statsSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { + if ssp.ss.containsStar { + // Sum all the columns + for _, c := range columns { + ssp.sum += sumValues(c.Values) + } + return 0 + } + + // Sum the requested columns + for _, field := range ssp.ss.fields { + if idx := getBlockColumnIndex(columns, field); idx >= 0 { + ssp.sum += sumValues(columns[idx].Values) + } + } + return 0 +} + +func sumValues(values []string) float64 { + sum := float64(0) + f := float64(0) + for i, v := range values { + if i == 0 || values[i-1] != v { + f, _ = tryParseFloat64(v) + if math.IsNaN(f) { + // Ignore NaN values, since this is the expected behaviour by most users. + f = 0 + } + } + sum += f + } + return sum +} + +func (ssp *statsSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int { + if ssp.ss.containsStar { + // Sum all the fields for the given row + for _, c := range columns { + v := c.Values[rowIdx] + f, _ := tryParseFloat64(v) + if !math.IsNaN(f) { + ssp.sum += f + } + } + return 0 + } + + // Sum only the given fields for the given row + for _, field := range ssp.ss.fields { + if idx := getBlockColumnIndex(columns, field); idx >= 0 { + v := columns[idx].Values[rowIdx] + f, _ := tryParseFloat64(v) + if !math.IsNaN(f) { + ssp.sum += f + } + } + } + return 0 +} + +func (ssp *statsSumProcessor) mergeState(sfp statsFuncProcessor) { + src := sfp.(*statsSumProcessor) + ssp.sum += src.sum +} + +func (ssp *statsSumProcessor) finalizeStats() (string, string) { + value := strconv.FormatFloat(ssp.sum, 'g', -1, 64) + return ssp.ss.resultName, value +} + +func parseStatsSum(lex *lexer) (*statsSum, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'sum' args: %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'sum' must contain at least one arg") + } + resultName, err := parseResultName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name: %w", err) + } + ss := &statsSum{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + resultName: resultName, + } + return ss, nil +} diff --git a/lib/logstorage/stats_unique.go b/lib/logstorage/stats_unique.go new file mode 100644 index 000000000..3fa571f60 --- /dev/null +++ b/lib/logstorage/stats_unique.go @@ -0,0 +1,255 @@ +package logstorage + +import ( + "fmt" + "slices" + "strconv" + "strings" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" +) + +type statsUniq struct { + fields []string + containsStar bool + resultName string +} + +func (su *statsUniq) String() string { + return "uniq(" + fieldNamesString(su.fields) + ") as " + quoteTokenIfNeeded(su.resultName) +} + +func (su *statsUniq) neededFields() []string { + return su.fields +} + +func (su *statsUniq) newStatsFuncProcessor() (statsFuncProcessor, int) { + sup := &statsUniqProcessor{ + su: su, + + m: make(map[string]struct{}), + } + return sup, int(unsafe.Sizeof(*sup)) +} + +type statsUniqProcessor struct { + su *statsUniq + + m map[string]struct{} + + columnValues [][]string + keyBuf []byte +} + +func (sup *statsUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int { + fields := sup.su.fields + m := sup.m + + stateSizeIncrease := 0 + if len(fields) == 0 || sup.su.containsStar { + // Count unique rows + keyBuf := sup.keyBuf + for i := range timestamps { + seenKey := true + for _, c := range columns { + values := c.Values + if i == 0 || values[i-1] != values[i] { + seenKey = false + break + } + } + if seenKey { + continue + } + + allEmptyValues := true + keyBuf = keyBuf[:0] + for _, c := range columns { + v := c.Values[i] + if v != "" { + allEmptyValues = false + } + // Put column name into key, since every block can contain different set of columns for '*' selector. + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + if allEmptyValues { + // Do not count empty values + continue + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + } + sup.keyBuf = keyBuf + return stateSizeIncrease + } + if len(fields) == 1 { + // Fast path for a single column + if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { + values := columns[idx].Values + for i, v := range values { + if v == "" { + // Do not count empty values + continue + } + if i > 0 && values[i-1] == v { + continue + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + } + } + return stateSizeIncrease + } + + // Slow path for multiple columns. + + // Pre-calculate column values for byFields in order to speed up building group key in the loop below. + sup.columnValues = appendBlockColumnValues(sup.columnValues[:0], columns, fields, len(timestamps)) + columnValues := sup.columnValues + + keyBuf := sup.keyBuf + for i := range timestamps { + seenKey := true + for _, values := range columnValues { + if i == 0 || values[i-1] != values[i] { + seenKey = false + } + } + if seenKey { + continue + } + + allEmptyValues := true + keyBuf = keyBuf[:0] + for _, values := range columnValues { + v := values[i] + if v != "" { + allEmptyValues = false + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + if allEmptyValues { + // Do not count empty values + continue + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + } + sup.keyBuf = keyBuf + return stateSizeIncrease +} + +func (sup *statsUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) int { + fields := sup.su.fields + m := sup.m + + stateSizeIncrease := 0 + if len(fields) == 0 || sup.su.containsStar { + // Count unique rows + allEmptyValues := true + keyBuf := sup.keyBuf[:0] + for _, c := range columns { + v := c.Values[rowIdx] + if v != "" { + allEmptyValues = false + } + // Put column name into key, since every block can contain different set of columns for '*' selector. + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.Name)) + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + sup.keyBuf = keyBuf + + if allEmptyValues { + // Do not count empty values + return stateSizeIncrease + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + return stateSizeIncrease + } + if len(fields) == 1 { + // Fast path for a single column + if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { + v := columns[idx].Values[rowIdx] + if v == "" { + // Do not count empty values + return stateSizeIncrease + } + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + stateSizeIncrease += len(vCopy) + int(unsafe.Sizeof(vCopy)) + } + } + return stateSizeIncrease + } + + // Slow path for multiple columns. + allEmptyValues := true + keyBuf := sup.keyBuf[:0] + for _, f := range fields { + v := "" + if idx := getBlockColumnIndex(columns, f); idx >= 0 { + v = columns[idx].Values[rowIdx] + } + if v != "" { + allEmptyValues = false + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + sup.keyBuf = keyBuf + + if allEmptyValues { + // Do not count empty values + return stateSizeIncrease + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + stateSizeIncrease += len(keyBuf) + int(unsafe.Sizeof("")) + } + return stateSizeIncrease +} + +func (sup *statsUniqProcessor) mergeState(sfp statsFuncProcessor) { + src := sfp.(*statsUniqProcessor) + m := sup.m + for k := range src.m { + m[k] = struct{}{} + } +} + +func (sup *statsUniqProcessor) finalizeStats() (string, string) { + n := uint64(len(sup.m)) + value := strconv.FormatUint(n, 10) + return sup.su.resultName, value +} + +func parseStatsUniq(lex *lexer) (*statsUniq, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err) + } + resultName, err := parseResultName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name: %w", err) + } + su := &statsUniq{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + resultName: resultName, + } + return su, nil +}