mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
54cc81602e
commit
b4fd20f17a
4 changed files with 54 additions and 6 deletions
|
@ -1414,6 +1414,15 @@ over the last 5 minutes:
|
||||||
_time:5m | stats count_uniq(host, path) unique_host_path_pairs
|
_time:5m | stats count_uniq(host, path) unique_host_path_pairs
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory.
|
||||||
|
Sometimes it is needed to know whether the number of unique values reaches some limit. In this case add `limit N` just after `count_uniq(...)`
|
||||||
|
for limiting the number of counted unique values up to `N`, while limiting the maximum memory usage. For example, the following query counts
|
||||||
|
up to `1_000_000` unique values for the `ip` field:
|
||||||
|
|
||||||
|
```logsql
|
||||||
|
_time:5m | stats count_uniq(ip) limit 1_000_000 as ips_1_000_000
|
||||||
|
```
|
||||||
|
|
||||||
See also:
|
See also:
|
||||||
|
|
||||||
- [`uniq_values`](#uniq_values-stats)
|
- [`uniq_values`](#uniq_values-stats)
|
||||||
|
@ -1491,9 +1500,11 @@ over logs for the last 5 minutes:
|
||||||
_time:5m | stats uniq_values(ip) unique_ips
|
_time:5m | stats uniq_values(ip) unique_ips
|
||||||
```
|
```
|
||||||
|
|
||||||
It is possible to specify the limit on the number of returned unique values by adding `limit N` just after `uniq_values()` and before the resulting column name.
|
Every unique value is stored in memory during query execution. Big number of unique values may require a lot of memory. Sometimes it is enough to return
|
||||||
|
only a subset of unique values. In this case add `limit N` after `uniq_values(...)` in order to limit the number of returned unique values to `N`,
|
||||||
|
while limiting the maximum memory usage.
|
||||||
For example, the following query returns up to `100` unique values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
For example, the following query returns up to `100` unique values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||||
over the logs for the last 5 minutes. Note that it may return arbitrary subset of unique `ip` values:
|
over the logs for the last 5 minutes. Note that arbitrary subset of unique `ip` values is returned every time:
|
||||||
|
|
||||||
```logsql
|
```logsql
|
||||||
_time:5m | stats uniq_values(ip) limit 100 as unique_ips_100
|
_time:5m | stats uniq_values(ip) limit 100 as unique_ips_100
|
||||||
|
|
|
@ -905,7 +905,7 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
|
|
||||||
// stats pipe count_uniq
|
// stats pipe count_uniq
|
||||||
f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`)
|
f(`* | stats count_uniq(foo) bar`, `* | stats count_uniq(foo) as bar`)
|
||||||
f(`* | stats by(x, y) count_uniq(foo,bar) as baz`, `* | stats by (x, y) count_uniq(foo, bar) as baz`)
|
f(`* | stats by(x, y) count_uniq(foo,bar) LiMit 10 As baz`, `* | stats by (x, y) count_uniq(foo, bar) limit 10 as baz`)
|
||||||
f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`)
|
f(`* | stats by(x) count_uniq(*) z`, `* | stats by (x) count_uniq(*) as z`)
|
||||||
f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`)
|
f(`* | stats by(x) count_uniq() z`, `* | stats by (x) count_uniq(*) as z`)
|
||||||
f(`* | stats by(x) count_uniq(a,*,b) z`, `* | stats by (x) count_uniq(*) as z`)
|
f(`* | stats by(x) count_uniq(a,*,b) z`, `* | stats by (x) count_uniq(*) as z`)
|
||||||
|
@ -1227,12 +1227,18 @@ func TestParseQueryFailure(t *testing.T) {
|
||||||
// invalid stats count_uniq
|
// invalid stats count_uniq
|
||||||
f(`foo | stats count_uniq`)
|
f(`foo | stats count_uniq`)
|
||||||
f(`foo | stats count_uniq()`)
|
f(`foo | stats count_uniq()`)
|
||||||
|
f(`foo | stats count_uniq() limit`)
|
||||||
|
f(`foo | stats count_uniq() limit foo`)
|
||||||
|
f(`foo | stats count_uniq() limit 0.5`)
|
||||||
|
f(`foo | stats count_uniq() limit -1`)
|
||||||
|
|
||||||
// invalid stats uniq_values
|
// invalid stats uniq_values
|
||||||
f(`foo | stats uniq_values`)
|
f(`foo | stats uniq_values`)
|
||||||
f(`foo | stats uniq_values()`)
|
f(`foo | stats uniq_values()`)
|
||||||
f(`foo | stats uniq_values() limit`)
|
f(`foo | stats uniq_values() limit`)
|
||||||
f(`foo | stats uniq_values(a) limit foo`)
|
f(`foo | stats uniq_values(a) limit foo`)
|
||||||
|
f(`foo | stats uniq_values(a) limit 0.5`)
|
||||||
|
f(`foo | stats uniq_values(a) limit -1`)
|
||||||
|
|
||||||
// invalid stats grouping fields
|
// invalid stats grouping fields
|
||||||
f(`foo | stats by(foo:bar) count() baz`)
|
f(`foo | stats by(foo:bar) count() baz`)
|
||||||
|
|
|
@ -517,7 +517,7 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
|
||||||
|
|
||||||
resultName, err := parseResultName(lex)
|
resultName, err := parseResultName(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", fmt.Errorf("cannot parse result name: %w", err)
|
return nil, "", fmt.Errorf("cannot parse result name for %s: %w", sf, err)
|
||||||
}
|
}
|
||||||
return sf, resultName, nil
|
return sf, resultName, nil
|
||||||
}
|
}
|
||||||
|
@ -528,7 +528,7 @@ func parseResultName(lex *lexer) (string, error) {
|
||||||
}
|
}
|
||||||
resultName, err := parseFieldName(lex)
|
resultName, err := parseFieldName(lex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("cannot parse 'as' field name: %w", err)
|
return "", err
|
||||||
}
|
}
|
||||||
return resultName, nil
|
return resultName, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package logstorage
|
package logstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
@ -12,10 +13,15 @@ import (
|
||||||
type statsCountUniq struct {
|
type statsCountUniq struct {
|
||||||
fields []string
|
fields []string
|
||||||
containsStar bool
|
containsStar bool
|
||||||
|
limit uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (su *statsCountUniq) String() string {
|
func (su *statsCountUniq) String() string {
|
||||||
return "count_uniq(" + fieldNamesString(su.fields) + ")"
|
s := "count_uniq(" + fieldNamesString(su.fields) + ")"
|
||||||
|
if su.limit > 0 {
|
||||||
|
s += fmt.Sprintf(" limit %d", su.limit)
|
||||||
|
}
|
||||||
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (su *statsCountUniq) neededFields() []string {
|
func (su *statsCountUniq) neededFields() []string {
|
||||||
|
@ -41,6 +47,10 @@ type statsCountUniqProcessor struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
|
if sup.limitReached() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
fields := sup.su.fields
|
fields := sup.su.fields
|
||||||
m := sup.m
|
m := sup.m
|
||||||
|
|
||||||
|
@ -216,6 +226,10 @@ func (sup *statsCountUniqProcessor) updateStatsForAllRows(br *blockResult) int {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
|
||||||
|
if sup.limitReached() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
fields := sup.su.fields
|
fields := sup.su.fields
|
||||||
m := sup.m
|
m := sup.m
|
||||||
|
|
||||||
|
@ -340,6 +354,10 @@ func (sup *statsCountUniqProcessor) updateStatsForRow(br *blockResult, rowIdx in
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sup *statsCountUniqProcessor) mergeState(sfp statsProcessor) {
|
func (sup *statsCountUniqProcessor) mergeState(sfp statsProcessor) {
|
||||||
|
if sup.limitReached() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
src := sfp.(*statsCountUniqProcessor)
|
src := sfp.(*statsCountUniqProcessor)
|
||||||
m := sup.m
|
m := sup.m
|
||||||
for k := range src.m {
|
for k := range src.m {
|
||||||
|
@ -354,6 +372,10 @@ func (sup *statsCountUniqProcessor) finalizeStats() string {
|
||||||
return strconv.FormatUint(n, 10)
|
return strconv.FormatUint(n, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sup *statsCountUniqProcessor) limitReached() bool {
|
||||||
|
return sup.su.limit > 0 && uint64(len(sup.m)) >= sup.su.limit
|
||||||
|
}
|
||||||
|
|
||||||
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {
|
func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {
|
||||||
fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq")
|
fields, err := parseFieldNamesForStatsFunc(lex, "count_uniq")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -363,5 +385,14 @@ func parseStatsCountUniq(lex *lexer) (*statsCountUniq, error) {
|
||||||
fields: fields,
|
fields: fields,
|
||||||
containsStar: slices.Contains(fields, "*"),
|
containsStar: slices.Contains(fields, "*"),
|
||||||
}
|
}
|
||||||
|
if lex.isKeyword("limit") {
|
||||||
|
lex.nextToken()
|
||||||
|
n, ok := tryParseUint64(lex.token)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'limit %s' for 'count_uniq': %w", lex.token, err)
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
su.limit = n
|
||||||
|
}
|
||||||
return su, nil
|
return su, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue