2024-05-12 14:33:29 +00:00
|
|
|
package logstorage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"slices"
|
|
|
|
"strconv"
|
|
|
|
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
type statsCountEmpty struct {
|
2024-05-22 19:01:20 +00:00
|
|
|
fields []string
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (sc *statsCountEmpty) String() string {
|
2024-05-22 19:01:20 +00:00
|
|
|
return "count_empty(" + statsFuncFieldsToString(sc.fields) + ")"
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
|
2024-05-20 02:08:30 +00:00
|
|
|
func (sc *statsCountEmpty) updateNeededFields(neededFields fieldsSet) {
|
2024-05-22 19:01:20 +00:00
|
|
|
updateNeededFieldsForStatsFunc(neededFields, sc.fields)
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
|
2024-12-22 01:09:36 +00:00
|
|
|
func (sc *statsCountEmpty) newStatsProcessor(a *chunkedAllocator) statsProcessor {
|
2025-01-13 03:48:19 +00:00
|
|
|
return a.newStatsCountEmptyProcessor()
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type statsCountEmptyProcessor struct {
|
|
|
|
rowsCount uint64
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (scp *statsCountEmptyProcessor) updateStatsForAllRows(sf statsFunc, br *blockResult) int {
|
|
|
|
sc := sf.(*statsCountEmpty)
|
|
|
|
fields := sc.fields
|
2024-05-22 19:01:20 +00:00
|
|
|
if len(fields) == 0 {
|
2024-09-25 14:16:53 +00:00
|
|
|
bm := getBitmap(br.rowsLen)
|
2024-05-12 14:33:29 +00:00
|
|
|
bm.setBits()
|
|
|
|
for _, c := range br.getColumns() {
|
|
|
|
values := c.getValues(br)
|
|
|
|
bm.forEachSetBit(func(idx int) bool {
|
|
|
|
return values[idx] == ""
|
|
|
|
})
|
|
|
|
}
|
|
|
|
scp.rowsCount += uint64(bm.onesCount())
|
|
|
|
putBitmap(bm)
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
if len(fields) == 1 {
|
|
|
|
// Fast path for count_empty(single_column)
|
|
|
|
c := br.getColumnByName(fields[0])
|
|
|
|
if c.isConst {
|
2024-05-20 02:08:30 +00:00
|
|
|
if c.valuesEncoded[0] == "" {
|
2024-09-25 14:16:53 +00:00
|
|
|
scp.rowsCount += uint64(br.rowsLen)
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
if c.isTime {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
switch c.valueType {
|
|
|
|
case valueTypeString:
|
2024-05-20 02:08:30 +00:00
|
|
|
for _, v := range c.getValuesEncoded(br) {
|
2024-05-12 14:33:29 +00:00
|
|
|
if v == "" {
|
|
|
|
scp.rowsCount++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
case valueTypeDict:
|
|
|
|
zeroDictIdx := slices.Index(c.dictValues, "")
|
|
|
|
if zeroDictIdx < 0 {
|
|
|
|
return 0
|
|
|
|
}
|
2024-05-20 02:08:30 +00:00
|
|
|
for _, v := range c.getValuesEncoded(br) {
|
2024-05-12 14:33:29 +00:00
|
|
|
if int(v[0]) == zeroDictIdx {
|
|
|
|
scp.rowsCount++
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0
|
2025-01-11 22:15:00 +00:00
|
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeInt64,
|
|
|
|
valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
2024-05-12 14:33:29 +00:00
|
|
|
return 0
|
|
|
|
default:
|
|
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Slow path - count rows containing empty value for all the fields enumerated inside count_empty().
|
2024-09-25 14:16:53 +00:00
|
|
|
bm := getBitmap(br.rowsLen)
|
2024-05-12 14:33:29 +00:00
|
|
|
defer putBitmap(bm)
|
|
|
|
|
|
|
|
bm.setBits()
|
|
|
|
for _, f := range fields {
|
|
|
|
c := br.getColumnByName(f)
|
|
|
|
if c.isConst {
|
2024-05-22 19:01:20 +00:00
|
|
|
if c.valuesEncoded[0] != "" {
|
2024-05-12 14:33:29 +00:00
|
|
|
return 0
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if c.isTime {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
switch c.valueType {
|
|
|
|
case valueTypeString:
|
2024-05-20 02:08:30 +00:00
|
|
|
valuesEncoded := c.getValuesEncoded(br)
|
2024-05-12 14:33:29 +00:00
|
|
|
bm.forEachSetBit(func(i int) bool {
|
2024-05-20 02:08:30 +00:00
|
|
|
return valuesEncoded[i] == ""
|
2024-05-12 14:33:29 +00:00
|
|
|
})
|
|
|
|
case valueTypeDict:
|
|
|
|
if !slices.Contains(c.dictValues, "") {
|
|
|
|
return 0
|
|
|
|
}
|
2024-05-20 02:08:30 +00:00
|
|
|
valuesEncoded := c.getValuesEncoded(br)
|
2024-05-12 14:33:29 +00:00
|
|
|
bm.forEachSetBit(func(i int) bool {
|
2024-05-20 02:08:30 +00:00
|
|
|
dictIdx := valuesEncoded[i][0]
|
2024-05-12 14:33:29 +00:00
|
|
|
return c.dictValues[dictIdx] == ""
|
|
|
|
})
|
2025-01-11 22:15:00 +00:00
|
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeInt64,
|
|
|
|
valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
2024-05-12 14:33:29 +00:00
|
|
|
return 0
|
|
|
|
default:
|
|
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
scp.rowsCount += uint64(bm.onesCount())
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (scp *statsCountEmptyProcessor) updateStatsForRow(sf statsFunc, br *blockResult, rowIdx int) int {
|
|
|
|
sc := sf.(*statsCountEmpty)
|
|
|
|
fields := sc.fields
|
2024-05-22 19:01:20 +00:00
|
|
|
if len(fields) == 0 {
|
2024-05-12 14:33:29 +00:00
|
|
|
for _, c := range br.getColumns() {
|
|
|
|
if v := c.getValueAtRow(br, rowIdx); v != "" {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
scp.rowsCount++
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
if len(fields) == 1 {
|
|
|
|
// Fast path for count_empty(single_column)
|
|
|
|
c := br.getColumnByName(fields[0])
|
|
|
|
if c.isConst {
|
2024-05-20 02:08:30 +00:00
|
|
|
if c.valuesEncoded[0] == "" {
|
2024-05-12 14:33:29 +00:00
|
|
|
scp.rowsCount++
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
if c.isTime {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
switch c.valueType {
|
|
|
|
case valueTypeString:
|
2024-05-20 02:08:30 +00:00
|
|
|
valuesEncoded := c.getValuesEncoded(br)
|
|
|
|
if v := valuesEncoded[rowIdx]; v == "" {
|
2024-05-12 14:33:29 +00:00
|
|
|
scp.rowsCount++
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
case valueTypeDict:
|
2024-05-20 02:08:30 +00:00
|
|
|
valuesEncoded := c.getValuesEncoded(br)
|
|
|
|
dictIdx := valuesEncoded[rowIdx][0]
|
2024-05-12 14:33:29 +00:00
|
|
|
if v := c.dictValues[dictIdx]; v == "" {
|
|
|
|
scp.rowsCount++
|
|
|
|
}
|
|
|
|
return 0
|
2025-01-11 22:15:00 +00:00
|
|
|
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeInt64,
|
|
|
|
valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
2024-05-12 14:33:29 +00:00
|
|
|
return 0
|
|
|
|
default:
|
|
|
|
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
|
|
|
|
for _, f := range fields {
|
|
|
|
c := br.getColumnByName(f)
|
|
|
|
if v := c.getValueAtRow(br, rowIdx); v != "" {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
scp.rowsCount++
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2025-02-14 14:30:03 +00:00
|
|
|
func (scp *statsCountEmptyProcessor) mergeState(_ *chunkedAllocator, _ statsFunc, sfp statsProcessor) {
|
2024-05-12 14:33:29 +00:00
|
|
|
src := sfp.(*statsCountEmptyProcessor)
|
|
|
|
scp.rowsCount += src.rowsCount
|
|
|
|
}
|
|
|
|
|
2025-01-13 03:48:19 +00:00
|
|
|
func (scp *statsCountEmptyProcessor) finalizeStats(_ statsFunc, dst []byte, _ <-chan struct{}) []byte {
|
2024-12-17 14:16:03 +00:00
|
|
|
return strconv.AppendUint(dst, scp.rowsCount, 10)
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) {
|
2024-05-22 19:01:20 +00:00
|
|
|
fields, err := parseStatsFuncFields(lex, "count_empty")
|
2024-05-12 14:33:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
sc := &statsCountEmpty{
|
2024-05-22 19:01:20 +00:00
|
|
|
fields: fields,
|
2024-05-12 14:33:29 +00:00
|
|
|
}
|
|
|
|
return sc, nil
|
|
|
|
}
|