From f2214f50734ceedc5ee59053fe2f83b7b0297ec9 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 5 May 2024 03:48:29 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/CHANGELOG.md | 2 +- docs/VictoriaLogs/LogsQL.md | 18 +++ lib/logstorage/bitmap.go | 9 ++ lib/logstorage/bitmap_test.go | 10 ++ lib/logstorage/parser_test.go | 9 ++ lib/logstorage/pipe_stats.go | 10 +- lib/logstorage/stats_count.go | 5 +- lib/logstorage/stats_count_empty.go | 206 ++++++++++++++++++++++++++++ 8 files changed, 262 insertions(+), 7 deletions(-) create mode 100644 lib/logstorage/stats_count_empty.go diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d55b2a441..2b7eedbba 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -21,7 +21,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default. * FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe). -* FEATURE: add support for calculating `count()`, `uniq()`, `sum()`, `avg()`, `min()`, `max()` and `uniq_array()` over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details. +* FEATURE: add support for calculating various stats over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for details. * FEATURE: add support for sorting the returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). * FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters). * FEATURE: add support for copying and renaming the selected log fields. See [these](https://docs.victoriametrics.com/victorialogs/logsql/#copy-pipe) and [these](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) docs. diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index d99c05b4a..452a928c6 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1303,6 +1303,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`avg`](#avg-stats) calculates the average value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`count`](#count-stats) calculates the number of log entries. +- [`count_empty`](#count_empty-stats) calculates the number logs with empty [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1329,6 +1330,22 @@ See also: - [`sum`](#sum-stats) - [`count`](#count-stats) +### count_empty stats + +`count_empty(field1, ..., fieldN)` calculates the number of logs with empty `(field1, ..., fieldN)` tuples. + +For example, the following query calculates the number of logs with empty `username` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +during the last 5 minutes: + +```logsql +_time:5m | stats count_empty(username) logs_with_missing_username +``` + +See also: + +- [`count`](#count-stats) +- [`uniq`](#uniq-stats) + ### count stats `count()` calculates the number of selected logs. @@ -1356,6 +1373,7 @@ _time:5m | stats count(username, password) logs_with_username_or_password See also: +- [`uniq`](#uniq-stats) - [`sum`](#sum-stats) - [`avg`](#avg-stats) diff --git a/lib/logstorage/bitmap.go b/lib/logstorage/bitmap.go index 51bd77789..56c4f6049 100644 --- a/lib/logstorage/bitmap.go +++ b/lib/logstorage/bitmap.go @@ -1,6 +1,7 @@ package logstorage import ( + "math/bits" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -142,3 +143,11 @@ func (bm *bitmap) forEachSetBit(f func(idx int) bool) { } } } + +func (bm *bitmap) onesCount() int { + n := 0 + for _, word := range bm.a { + n += bits.OnesCount64(word) + } + return n +} diff --git a/lib/logstorage/bitmap_test.go b/lib/logstorage/bitmap_test.go index 90ed0fffc..4627e2055 100644 --- a/lib/logstorage/bitmap_test.go +++ b/lib/logstorage/bitmap_test.go @@ -20,9 +20,16 @@ func TestBitmap(t *testing.T) { if i > 0 && bm.areAllBitsSet() { t.Fatalf("areAllBitsSet() must return false on new bitmap with %d bits; %#v", i, bm) } + if n := bm.onesCount(); n != 0 { + t.Fatalf("unexpected number of set bits; got %d; want %d", n, 0) + } bm.setBits() + if n := bm.onesCount(); n != i { + t.Fatalf("unexpected number of set bits; got %d; want %d", n, i) + } + // Make sure that all the bits are set. nextIdx := 0 bm.forEachSetBit(func(idx int) bool { @@ -81,6 +88,9 @@ func TestBitmap(t *testing.T) { if i > 0 && bm.areAllBitsSet() { t.Fatalf("areAllBitsSet() must return false for bitmap with %d bits", i) } + if n := bm.onesCount(); n != 0 { + t.Fatalf("unexpected number of set bits; got %d; want %d", n, 0) + } bitsCount := 0 bm.forEachSetBit(func(_ int) bool { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 359e14d4a..0bbb80b99 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -866,6 +866,10 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats count('') foo`, `* | stats count(_msg) as foo`) f(`* | stats count(foo) ''`, `* | stats count(foo) as _msg`) + // stats pipe count_empty + f(`* | stats count_empty() x`, `* | stats count_empty(*) as x`) + f(`* | stats by (x, y) count_empty(a,b,c) x`, `* | stats by (x, y) count_empty(a, b, c) as x`) + // stats pipe sum f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`) f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`) @@ -1179,6 +1183,11 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats count() as`) f(`foo | stats count() as |`) + // invalid stats count_empty + f(`foo | stats count_empty`) + f(`foo | stats count_empty() as`) + f(`foo | stats count_empty() as |`) + // invalid stats sum f(`foo | stats sum`) f(`foo | stats sum()`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 56ed93158..240d88174 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -444,6 +444,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'count' func: %w", err) } sf = scs + case lex.isKeyword("count_empty"): + scs, err := parseStatsCountEmpty(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'count_empty' func: %w", err) + } + sf = scs case lex.isKeyword("uniq"): sus, err := parseStatsUniq(lex) if err != nil { @@ -451,11 +457,11 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { } sf = sus case lex.isKeyword("sum"): - sfs, err := parseStatsSum(lex) + sss, err := parseStatsSum(lex) if err != nil { return nil, "", fmt.Errorf("cannot parse 'sum' func: %w", err) } - sf = sfs + sf = sss case lex.isKeyword("max"): sms, err := parseStatsMax(lex) if err != nil { diff --git a/lib/logstorage/stats_count.go b/lib/logstorage/stats_count.go index 7a6d14890..fc7219211 100644 --- a/lib/logstorage/stats_count.go +++ b/lib/logstorage/stats_count.go @@ -129,10 +129,7 @@ func (scp *statsCountProcessor) updateStatsForAllRows(br *blockResult) int { } scp.rowsCount += uint64(len(br.timestamps)) - bm.forEachSetBit(func(i int) bool { - scp.rowsCount-- - return true - }) + scp.rowsCount -= uint64(bm.onesCount()) return 0 } diff --git a/lib/logstorage/stats_count_empty.go b/lib/logstorage/stats_count_empty.go new file mode 100644 index 000000000..62581d817 --- /dev/null +++ b/lib/logstorage/stats_count_empty.go @@ -0,0 +1,206 @@ +package logstorage + +import ( + "slices" + "strconv" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type statsCountEmpty struct { + fields []string + containsStar bool +} + +func (sc *statsCountEmpty) String() string { + return "count_empty(" + fieldNamesString(sc.fields) + ")" +} + +func (sc *statsCountEmpty) neededFields() []string { + return sc.fields +} + +func (sc *statsCountEmpty) newStatsProcessor() (statsProcessor, int) { + scp := &statsCountEmptyProcessor{ + sc: sc, + } + return scp, int(unsafe.Sizeof(*scp)) +} + +type statsCountEmptyProcessor struct { + sc *statsCountEmpty + + rowsCount uint64 +} + +func (scp *statsCountEmptyProcessor) updateStatsForAllRows(br *blockResult) int { + fields := scp.sc.fields + if scp.sc.containsStar { + bm := getBitmap(len(br.timestamps)) + bm.setBits() + for _, c := range br.getColumns() { + values := c.getValues(br) + bm.forEachSetBit(func(idx int) bool { + return values[idx] == "" + }) + } + scp.rowsCount += uint64(bm.onesCount()) + putBitmap(bm) + return 0 + } + if len(fields) == 1 { + // Fast path for count_empty(single_column) + c := br.getColumnByName(fields[0]) + if c.isConst { + if c.encodedValues[0] == "" { + scp.rowsCount += uint64(len(br.timestamps)) + } + return 0 + } + if c.isTime { + return 0 + } + switch c.valueType { + case valueTypeString: + for _, v := range c.encodedValues { + if v == "" { + scp.rowsCount++ + } + } + return 0 + case valueTypeDict: + zeroDictIdx := slices.Index(c.dictValues, "") + if zeroDictIdx < 0 { + return 0 + } + for _, v := range c.encodedValues { + if int(v[0]) == zeroDictIdx { + scp.rowsCount++ + } + } + return 0 + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } + } + + // Slow path - count rows containing empty value for all the fields enumerated inside count_empty(). + bm := getBitmap(len(br.timestamps)) + defer putBitmap(bm) + + bm.setBits() + for _, f := range fields { + c := br.getColumnByName(f) + if c.isConst { + if c.encodedValues[0] == "" { + scp.rowsCount += uint64(len(br.timestamps)) + return 0 + } + continue + } + if c.isTime { + return 0 + } + switch c.valueType { + case valueTypeString: + bm.forEachSetBit(func(i int) bool { + return c.encodedValues[i] == "" + }) + case valueTypeDict: + if !slices.Contains(c.dictValues, "") { + return 0 + } + bm.forEachSetBit(func(i int) bool { + dictIdx := c.encodedValues[i][0] + return c.dictValues[dictIdx] == "" + }) + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } + } + + scp.rowsCount += uint64(bm.onesCount()) + return 0 +} + +func (scp *statsCountEmptyProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + fields := scp.sc.fields + if scp.sc.containsStar { + for _, c := range br.getColumns() { + if v := c.getValueAtRow(br, rowIdx); v != "" { + return 0 + } + } + scp.rowsCount++ + return 0 + } + if len(fields) == 1 { + // Fast path for count_empty(single_column) + c := br.getColumnByName(fields[0]) + if c.isConst { + if c.encodedValues[0] == "" { + scp.rowsCount++ + } + return 0 + } + if c.isTime { + return 0 + } + switch c.valueType { + case valueTypeString: + if v := c.encodedValues[rowIdx]; v == "" { + scp.rowsCount++ + } + return 0 + case valueTypeDict: + dictIdx := c.encodedValues[rowIdx][0] + if v := c.dictValues[dictIdx]; v == "" { + scp.rowsCount++ + } + return 0 + case valueTypeUint8, valueTypeUint16, valueTypeUint32, valueTypeUint64, valueTypeFloat64, valueTypeIPv4, valueTypeTimestampISO8601: + return 0 + default: + logger.Panicf("BUG: unknown valueType=%d", c.valueType) + return 0 + } + } + + // Slow path - count the row at rowIdx if at least a single field enumerated inside count() is non-empty + for _, f := range fields { + c := br.getColumnByName(f) + if v := c.getValueAtRow(br, rowIdx); v != "" { + return 0 + } + } + scp.rowsCount++ + return 0 +} + +func (scp *statsCountEmptyProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsCountEmptyProcessor) + scp.rowsCount += src.rowsCount +} + +func (scp *statsCountEmptyProcessor) finalizeStats() string { + return strconv.FormatUint(scp.rowsCount, 10) +} + +func parseStatsCountEmpty(lex *lexer) (*statsCountEmpty, error) { + fields, err := parseFieldNamesForStatsFunc(lex, "count_empty") + if err != nil { + return nil, err + } + sc := &statsCountEmpty{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + return sc, nil +}