mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
c716c1f074
commit
f2214f5073
8 changed files with 262 additions and 7 deletions
|
@ -21,7 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
|
||||
* FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default.
|
||||
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe).
|
||||
* FEATURE: add support for calculating `count()`, `uniq()`, `sum()`, `avg()`, `min()`, `max()` and `uniq_array()` over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details.
|
||||
* FEATURE: add support for calculating various stats over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details.
|
||||
* FEATURE: add support for sorting the returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe).
|
||||
* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
|
||||
* FEATURE: add support for copying and renaming the selected log fields. See [these](https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe) and [these](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) docs.
|
||||
|
|
|
@ -1303,6 +1303,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
|
|||
|
||||
- [`avg`](#avg-stats) calculates the average value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`count`](#count-stats) calculates the number of log entries.
|
||||
- [`count_empty`](#count_empty-stats) calculates the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
@ -1329,6 +1330,22 @@ See also:
|
|||
- [`sum`](#sum-stats)
|
||||
- [`count`](#count-stats)
|
||||
|
||||
### count_empty stats
|
||||
|
||||
`count_empty(field1, ..., fieldN)` calculates the number of logs with empty `(field1, ..., fieldN)` tuples.
|
||||
|
||||
For example, the following query calculates the number of logs with empty `username` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
during the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | stats count_empty(username) logs_with_missing_username
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [`count`](#count-stats)
|
||||
- [`uniq`](#uniq-stats)
|
||||
|
||||
### count stats
|
||||
|
||||
`count()` calculates the number of selected logs.
|
||||
|
@ -1356,6 +1373,7 @@ _time:5m | stats count(username, password) logs_with_username_or_password
|
|||
|
||||
See also:
|
||||
|
||||
- [`uniq`](#uniq-stats)
|
||||
- [`sum`](#sum-stats)
|
||||
- [`avg`](#avg-stats)
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"math/bits"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
|
@ -142,3 +143,11 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bm *bitmap) onesCount() int {
|
||||
n := 0
|
||||
for _, word := range bm.a {
|
||||
n += bits.OnesCount64(word)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
|
|
@ -20,9 +20,16 @@ func TestBitmap(t *testing.T) {
|
|||
if i > 0 && bm.areAllBitsSet() {
|
||||
t.Fatalf("areAllBitsSet() must return false on new bitmap with %d bits; %#v", i, bm)
|
||||
}
|
||||
if n := bm.onesCount(); n != 0 {
|
||||
t.Fatalf("unexpected number of set bits; got %d; want %d", n, 0)
|
||||
}
|
||||
|
||||
bm.setBits()
|
||||
|
||||
if n := bm.onesCount(); n != i {
|
||||
t.Fatalf("unexpected number of set bits; got %d; want %d", n, i)
|
||||
}
|
||||
|
||||
// Make sure that all the bits are set.
|
||||
nextIdx := 0
|
||||
bm.forEachSetBit(func(idx int) bool {
|
||||
|
@ -81,6 +88,9 @@ func TestBitmap(t *testing.T) {
|
|||
if i > 0 && bm.areAllBitsSet() {
|
||||
t.Fatalf("areAllBitsSet() must return false for bitmap with %d bits", i)
|
||||
}
|
||||
if n := bm.onesCount(); n != 0 {
|
||||
t.Fatalf("unexpected number of set bits; got %d; want %d", n, 0)
|
||||
}
|
||||
|
||||
bitsCount := 0
|
||||
bm.forEachSetBit(func(_ int) bool {
|
||||
|
|
|
@ -866,6 +866,10 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
f(`* | stats count('') foo`, `* | stats count(_msg) as foo`)
|
||||
f(`* | stats count(foo) ''`, `* | stats count(foo) as _msg`)
|
||||
|
||||
// stats pipe count_empty
|
||||
f(`* | stats count_empty() x`, `* | stats count_empty(*) as x`)
|
||||
f(`* | stats by (x, y) count_empty(a,b,c) x`, `* | stats by (x, y) count_empty(a, b, c) as x`)
|
||||
|
||||
// stats pipe sum
|
||||
f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`)
|
||||
f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`)
|
||||
|
@ -1179,6 +1183,11 @@ func TestParseQueryFailure(t *testing.T) {
|
|||
f(`foo | stats count() as`)
|
||||
f(`foo | stats count() as |`)
|
||||
|
||||
// invalid stats count_empty
|
||||
f(`foo | stats count_empty`)
|
||||
f(`foo | stats count_empty() as`)
|
||||
f(`foo | stats count_empty() as |`)
|
||||
|
||||
// invalid stats sum
|
||||
f(`foo | stats sum`)
|
||||
f(`foo | stats sum()`)
|
||||
|
|
|
@ -444,6 +444,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
|||
return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err)
|
||||
}
|
||||
sf = scs
|
||||
case lex.isKeyword("count_empty"):
|
||||
scs, err := parseStatsCountEmpty(lex)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("cannot parse 'count_empty' func: %w", err)
|
||||
}
|
||||
sf = scs
|
||||
case lex.isKeyword("uniq"):
|
||||
sus, err := parseStatsUniq(lex)
|
||||
if err != nil {
|
||||
|
@ -451,11 +457,11 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
|||
}
|
||||
sf = sus
|
||||
case lex.isKeyword("sum"):
|
||||
sfs, err := parseStatsSum(lex)
|
||||
sss, err := parseStatsSum(lex)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("cannot parse 'sum' func: %w", err)
|
||||
}
|
||||
sf = sfs
|
||||
sf = sss
|
||||
case lex.isKeyword("max"):
|
||||
sms, err := parseStatsMax(lex)
|
||||
if err != nil {
|
||||
|
|
|
@ -129,10 +129,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int {
|
|||
}
|
||||
|
||||
scp.rowsCount += uint64(len(br.timestamps))
|
||||
bm.forEachSetBit(func(i int) bool {
|
||||
scp.rowsCount--
|
||||
return true
|
||||
})
|
||||
scp.rowsCount -= uint64(bm.onesCount())
|
||||
return 0
|
||||
}
|
||||
|
||||
|
|
206
lib/logstorage/stats_count_empty.go
Normal file
206
lib/logstorage/stats_count_empty.go
Normal file
|
@ -0,0 +1,206 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"strconv"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
)
|
||||
|
||||
type statsCountEmpty struct {
|
||||
fields []string
|
||||
containsStar bool
|
||||
}
|
||||
|
||||
func (sc *statsCountEmpty) String() string {
|
||||
return "count_empty(" + fieldNamesString(sc.fields) + ")"
|
||||
}
|
||||
|
||||
func (sc *statsCountEmpty) neededFields() []string {
|
||||
return sc.fields
|
||||
}
|
||||
|
||||
func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) {
|
||||
scp := &statsCountEmptyProcessor{
|
||||
sc: sc,
|
||||
}
|
||||
return scp, int(unsafe.Sizeof(*scp))
|
||||
}
|
||||
|
||||
type statsCountEmptyProcessor struct {
|
||||
sc *statsCountEmpty
|
||||
|
||||
rowsCount uint64
|
||||
}
|
||||
|
||||
func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||
fields := scp.sc.fields
|
||||
if scp.sc.containsStar {
|
||||
bm := getBitmap(len(br.timestamps))
|
||||
bm.setBits()
|
||||
for _, c := range br.getColumns() {
|
||||
values := c.getValues(br)
|
||||
bm.forEachSetBit(func(idx int) bool {
|
||||
return values[idx] == ""
|
||||
})
|
||||
}
|
||||
scp.rowsCount += uint64(bm.onesCount())
|
||||
putBitmap(bm)
|
||||
return 0
|
||||
}
|
||||
if len(fields) == 1 {
|
||||
// Fast path for count_empty(single_column)
|
||||
c := br.getColumnByName(fields[0])
|
||||
if c.isConst {
|
||||
if c.encodedValues[0] == "" {
|
||||
scp.rowsCount += uint64(len(br.timestamps))
|
||||
}
|
||||
return 0
|
||||
}
|
||||
if c.isTime {
|
||||
return 0
|
||||
}
|
||||
switch c.valueType {
|
||||
case valueTypeString:
|
||||
for _, v := range c.encodedValues {
|
||||
if v == "" {
|
||||
scp.rowsCount++
|
||||
}
|
||||
}
|
||||
return 0
|
||||
case valueTypeDict:
|
||||
zeroDictIdx := slices.Index(c.dictValues, "")
|
||||
if zeroDictIdx < 0 {
|
||||
return 0
|
||||
}
|
||||
for _, v := range c.encodedValues {
|
||||
if int(v[0]) == zeroDictIdx {
|
||||
scp.rowsCount++
|
||||
}
|
||||
}
|
||||
return 0
|
||||
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
||||
return 0
|
||||
default:
|
||||
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - count rows containing empty value for all the fields enumerated inside count_empty().
|
||||
bm := getBitmap(len(br.timestamps))
|
||||
defer putBitmap(bm)
|
||||
|
||||
bm.setBits()
|
||||
for _, f := range fields {
|
||||
c := br.getColumnByName(f)
|
||||
if c.isConst {
|
||||
if c.encodedValues[0] == "" {
|
||||
scp.rowsCount += uint64(len(br.timestamps))
|
||||
return 0
|
||||
}
|
||||
continue
|
||||
}
|
||||
if c.isTime {
|
||||
return 0
|
||||
}
|
||||
switch c.valueType {
|
||||
case valueTypeString:
|
||||
bm.forEachSetBit(func(i int) bool {
|
||||
return c.encodedValues[i] == ""
|
||||
})
|
||||
case valueTypeDict:
|
||||
if !slices.Contains(c.dictValues, "") {
|
||||
return 0
|
||||
}
|
||||
bm.forEachSetBit(func(i int) bool {
|
||||
dictIdx := c.encodedValues[i][0]
|
||||
return c.dictValues[dictIdx] == ""
|
||||
})
|
||||
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
||||
return 0
|
||||
default:
|
||||
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
scp.rowsCount += uint64(bm.onesCount())
|
||||
return 0
|
||||
}
|
||||
|
||||
func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
||||
fields := scp.sc.fields
|
||||
if scp.sc.containsStar {
|
||||
for _, c := range br.getColumns() {
|
||||
if v := c.getValueAtRow(br, rowIdx); v != "" {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
scp.rowsCount++
|
||||
return 0
|
||||
}
|
||||
if len(fields) == 1 {
|
||||
// Fast path for count_empty(single_column)
|
||||
c := br.getColumnByName(fields[0])
|
||||
if c.isConst {
|
||||
if c.encodedValues[0] == "" {
|
||||
scp.rowsCount++
|
||||
}
|
||||
return 0
|
||||
}
|
||||
if c.isTime {
|
||||
return 0
|
||||
}
|
||||
switch c.valueType {
|
||||
case valueTypeString:
|
||||
if v := c.encodedValues[rowIdx]; v == "" {
|
||||
scp.rowsCount++
|
||||
}
|
||||
return 0
|
||||
case valueTypeDict:
|
||||
dictIdx := c.encodedValues[rowIdx][0]
|
||||
if v := c.dictValues[dictIdx]; v == "" {
|
||||
scp.rowsCount++
|
||||
}
|
||||
return 0
|
||||
case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601:
|
||||
return 0
|
||||
default:
|
||||
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty
|
||||
for _, f := range fields {
|
||||
c := br.getColumnByName(f)
|
||||
if v := c.getValueAtRow(br, rowIdx); v != "" {
|
||||
return 0
|
||||
}
|
||||
}
|
||||
scp.rowsCount++
|
||||
return 0
|
||||
}
|
||||
|
||||
func (scp *statsCountEmptyProcessor) mergeState(sfp statsProcessor) {
|
||||
src := sfp.(*statsCountEmptyProcessor)
|
||||
scp.rowsCount += src.rowsCount
|
||||
}
|
||||
|
||||
func (scp *statsCountEmptyProcessor) finalizeStats() string {
|
||||
return strconv.FormatUint(scp.rowsCount, 10)
|
||||
}
|
||||
|
||||
func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) {
|
||||
fields, err := parseFieldNamesForStatsFunc(lex, "count_empty")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sc := &statsCountEmpty{
|
||||
fields: fields,
|
||||
containsStar: slices.Contains(fields, "*"),
|
||||
}
|
||||
return sc, nil
|
||||
}
|
Loading…
Reference in a new issue