VictoriaMetrics/lib/logstorage/stats_count.go

211 lines
5 KiB
Go
Raw Normal View History

2024-04-29 01:20:43 +00:00
package logstorage
import (
"slices"
"strconv"
"unsafe"
2024-04-30 21:03:34 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2024-04-29 01:20:43 +00:00
)
type statsCount struct {
2024-05-22 09:25:49 +00:00
fields []string
2024-04-29 01:20:43 +00:00
}
func (sc *statsCount) String() string {
2024-05-22 09:25:49 +00:00
return "count(" + statsFuncFieldsToString(sc.fields) + ")"
2024-04-29 01:20:43 +00:00
}
2024-05-17 02:11:10 +00:00
func (sc *statsCount) updateNeededFields(neededFields fieldsSet) {
2024-05-22 09:25:49 +00:00
if len(sc.fields) == 0 {
2024-05-04 22:28:01 +00:00
// There is no need in fetching any columns for count(*) - the number of matching rows can be calculated as len(blockResult.timestamps)
2024-05-17 02:11:10 +00:00
return
2024-05-04 22:28:01 +00:00
}
2024-05-17 09:32:27 +00:00
neededFields.addFields(sc.fields)
2024-04-29 01:20:43 +00:00
}
2024-04-29 01:23:41 +00:00
func (sc *statsCount) newStatsProcessor() (statsProcessor, int) {
2024-04-29 01:20:43 +00:00
scp := &statsCountProcessor{
sc: sc,
}
return scp, int(unsafe.Sizeof(*scp))
}
type statsCountProcessor struct {
sc *statsCount
rowsCount uint64
}
2024-04-30 21:03:34 +00:00
func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
2024-04-29 01:20:43 +00:00
fields := scp.sc.fields
2024-05-22 09:25:49 +00:00
if len(fields) == 0 {
2024-04-30 21:03:34 +00:00
// Fast path - unconditionally count all the columns.
scp.rowsCount += uint64(len(br.timestamps))
2024-04-29 01:20:43 +00:00
return 0
}
2024-04-30 21:03:34 +00:00
if len(fields) == 1 {
// Fast path for count(single_column)
c := br.getColumnByName(fields[0])
if c.isConst {
2024-05-15 20:19:21 +00:00
if c.valuesEncoded[0] != "" {
2024-04-30 21:03:34 +00:00
scp.rowsCount += uint64(len(br.timestamps))
}
return 0
}
if c.isTime {
scp.rowsCount += uint64(len(br.timestamps))
return 0
}
switch c.valueType {
case valueTypeString:
2024-05-15 20:19:21 +00:00
for _, v := range c.getValuesEncoded(br) {
2024-04-30 21:03:34 +00:00
if v != "" {
scp.rowsCount++
}
}
return 0
case valueTypeDict:
zeroDictIdx := slices.Index(c.dictValues, "")
if zeroDictIdx < 0 {
scp.rowsCount += uint64(len(br.timestamps))
return 0
}
2024-05-15 20:19:21 +00:00
for _, v := range c.getValuesEncoded(br) {
2024-04-30 21:03:34 +00:00
if int(v[0]) != zeroDictIdx {
scp.rowsCount++
}
}
return 0
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
scp.rowsCount += uint64(len(br.timestamps))
return 0
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0
}
}
2024-04-29 01:20:43 +00:00
// Slow path - count rows containing at least a single non-empty value for the fields enumerated inside count().
2024-04-30 21:03:34 +00:00
bm := getBitmap(len(br.timestamps))
2024-04-29 01:47:25 +00:00
defer putBitmap(bm)
2024-04-29 01:20:43 +00:00
bm.setBits()
for _, f := range fields {
2024-04-30 21:03:34 +00:00
c := br.getColumnByName(f)
if c.isConst {
2024-05-15 20:19:21 +00:00
if c.valuesEncoded[0] != "" {
2024-04-30 21:03:34 +00:00
scp.rowsCount += uint64(len(br.timestamps))
return 0
}
continue
}
if c.isTime {
scp.rowsCount += uint64(len(br.timestamps))
return 0
}
2024-05-15 20:19:21 +00:00
2024-04-30 21:03:34 +00:00
switch c.valueType {
case valueTypeString:
2024-05-15 20:19:21 +00:00
valuesEncoded := c.getValuesEncoded(br)
2024-04-30 21:03:34 +00:00
bm.forEachSetBit(func(i int) bool {
2024-05-15 20:19:21 +00:00
return valuesEncoded[i] == ""
2024-04-30 21:03:34 +00:00
})
case valueTypeDict:
if !slices.Contains(c.dictValues, "") {
scp.rowsCount += uint64(len(br.timestamps))
return 0
}
2024-05-15 20:19:21 +00:00
valuesEncoded := c.getValuesEncoded(br)
2024-04-29 01:20:43 +00:00
bm.forEachSetBit(func(i int) bool {
2024-05-15 20:19:21 +00:00
dictIdx := valuesEncoded[i][0]
2024-04-30 21:03:34 +00:00
return c.dictValues[dictIdx] == ""
2024-04-29 01:20:43 +00:00
})
2024-04-30 21:03:34 +00:00
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
scp.rowsCount += uint64(len(br.timestamps))
return 0
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0
2024-04-29 01:20:43 +00:00
}
}
2024-04-30 21:03:34 +00:00
scp.rowsCount += uint64(len(br.timestamps))
2024-05-05 01:48:29 +00:00
scp.rowsCount -= uint64(bm.onesCount())
2024-04-29 01:20:43 +00:00
return 0
}
2024-04-30 21:03:34 +00:00
func (scp *statsCountProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
2024-04-29 01:20:43 +00:00
fields := scp.sc.fields
2024-05-22 09:25:49 +00:00
if len(fields) == 0 {
2024-04-30 21:03:34 +00:00
// Fast path - unconditionally count the given column
2024-04-29 01:20:43 +00:00
scp.rowsCount++
return 0
}
2024-04-30 21:03:34 +00:00
if len(fields) == 1 {
// Fast path for count(single_column)
c := br.getColumnByName(fields[0])
if c.isConst {
2024-05-15 20:19:21 +00:00
if c.valuesEncoded[0] != "" {
2024-04-30 21:03:34 +00:00
scp.rowsCount++
}
return 0
}
if c.isTime {
scp.rowsCount++
return 0
}
switch c.valueType {
case valueTypeString:
2024-05-15 20:19:21 +00:00
valuesEncoded := c.getValuesEncoded(br)
if v := valuesEncoded[rowIdx]; v != "" {
2024-04-30 21:03:34 +00:00
scp.rowsCount++
}
return 0
case valueTypeDict:
2024-05-15 20:19:21 +00:00
valuesEncoded := c.getValuesEncoded(br)
dictIdx := valuesEncoded[rowIdx][0]
2024-04-30 21:03:34 +00:00
if v := c.dictValues[dictIdx]; v != "" {
scp.rowsCount++
}
return 0
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
scp.rowsCount++
return 0
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0
}
}
2024-04-29 01:20:43 +00:00
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
for _, f := range fields {
2024-04-30 21:03:34 +00:00
c := br.getColumnByName(f)
if v := c.getValueAtRow(br, rowIdx); v != "" {
2024-04-29 01:20:43 +00:00
scp.rowsCount++
return 0
}
}
return 0
}
2024-04-29 01:23:41 +00:00
func (scp *statsCountProcessor) mergeState(sfp statsProcessor) {
2024-04-29 01:20:43 +00:00
src := sfp.(*statsCountProcessor)
scp.rowsCount += src.rowsCount
}
2024-04-30 23:19:22 +00:00
func (scp *statsCountProcessor) finalizeStats() string {
return strconv.FormatUint(scp.rowsCount, 10)
2024-04-29 01:20:43 +00:00
}
func parseStatsCount(lex *lexer) (*statsCount, error) {
2024-05-22 09:25:49 +00:00
fields, err := parseStatsFuncFields(lex, "count")
2024-04-29 01:20:43 +00:00
if err != nil {
2024-05-03 09:15:09 +00:00
return nil, err
2024-04-29 01:20:43 +00:00
}
sc := &statsCount{
2024-05-22 09:25:49 +00:00
fields: fields,
2024-04-29 01:20:43 +00:00
}
return sc, nil
}