This commit is contained in:
Aliaksandr Valialkin 2024-04-29 03:08:35 +02:00
parent cb42a1a6fc
commit 449eade980
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 175 additions and 31 deletions

View file

@ -1072,30 +1072,39 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
LogsQL supports calculating the following stats:
- The number of matching log entries. Examples:
- `error | stats count() as errors_total` returns the number of log messages containing the `error` [word](#word).
- `error | stats by (_stream) count() as errors_by_stream` returns the number of log messages containing the `error` [word](#word)
grouped by [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
- `error | stats by (datacenter, namespace) count(trace_id, user_id) as errors_with_trace_and_user` returns the number of log messages containing the `error` [word](#word),
- `error | stats count() as errors_total` returns the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word).
- `error | stats by (_stream) count() as errors_by_stream` returns the number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
with the `error` [word](#word) grouped by [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
- `error | stats by (datacenter, namespace) count(trace_id, user_id) as errors_with_trace_and_user` returns the number
of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) containing the `error` [word](#word),
which contain non-empty `trace_id` or `user_id` [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `datacenter` and `namespace` fields.
- The number of unique values for the given set of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Examples:
- `error | stats uniq(client_ip) as unique_user_ips` returns the number of unique values for `client_ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across log messages with the `error` [word](#word).
across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word).
- `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique `(path, host)` pairs
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word),
grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- `error | fields path, host | stats uniq(*)` - returns the number of unique `(path, host)` pairs
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across log messages with the `error` [word](#word).
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
with the `error` [word](#word), grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- `error | fields path, host | stats uniq(*) unique_path_hosts` - returns the number of unique `(path, host)` pairs
for [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
with the `error` [word](#word).
Stats' calculation can be combined in a single query. For example, the following query calculates the number of log messages with the `error` [word](#word),
the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the number of unique values
for `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
- Sum for the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Examples:
- `error | stats sum(duration) duration_total` - returns the sum of `duration` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values
across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `error` [word](#word).
- `GET | stats by (path) sum(response_size)` - returns the sum of `response_size` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values
across [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with the `GET` [word](#word), grouped
by `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) value.
Stats calculations can be combined. For example, the following query calculates the number of log messages with the `error` [word](#word),
the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the sum of `duration`
[field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
```logsql
error | stats by (namespace)
count() as errors_total,
uniq(ip) as unique_ips,
uniq(path) as unique_paths
sum(duration) as duration_sum
```
LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)

View file

@ -838,6 +838,10 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | STATS bY (foo, b.a/r, "b az") count(*) XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`)
f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`)
// stats pipe sum
f(`* | stats Sum(foo) bar`, `* | stats sum(foo) as bar`)
f(`* | stats BY(x, y, ) SUM(foo,bar,) bar`, `* | stats by (x, y) sum(foo, bar) as bar`)
// stats pipe uniq
f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`)
f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`)
@ -1090,9 +1094,15 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats count() as`)
f(`foo | stats count() as |`)
// invalid stats sum
f(`foo | stats sum`)
f(`foo | stats sum()`)
f(`foo | stats sum() as abc`)
// invalid stats uniq
f(`foo | stats uniq`)
f(`foo | stats uniq()`)
f(`foo | stats uniq() as abc`)
// invalid by clause
f(`foo | stats by`)

View file

@ -2,6 +2,7 @@ package logstorage
import (
"fmt"
"math"
"slices"
"strconv"
"strings"
@ -603,6 +604,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) {
return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err)
}
return sfu, nil
case lex.isKeyword("sum"):
sfs, err := parseStatsFuncSum(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' func: %w", err)
}
return sfs, nil
default:
return nil, fmt.Errorf("unknown stats func %q", lex.token)
}
@ -696,6 +703,142 @@ func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) {
return sfcp.sfc.resultName, value
}
func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfc := &statsFuncCount{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfc, nil
}
type statsFuncSum struct {
fields []string
containsStar bool
resultName string
}
func (sfs *statsFuncSum) String() string {
return "sum(" + fieldNamesString(sfs.fields) + ") as " + quoteTokenIfNeeded(sfs.resultName)
}
func (sfs *statsFuncSum) neededFields() []string {
return sfs.fields
}
func (sfs *statsFuncSum) newStatsFuncProcessor() (statsFuncProcessor, int) {
sfsp := &statsFuncSumProcessor{
sfs: sfs,
}
return sfsp, int(unsafe.Sizeof(*sfsp))
}
type statsFuncSumProcessor struct {
sfs *statsFuncSum
sum float64
}
func (sfsp *statsFuncSumProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) int {
if sfsp.sfs.containsStar {
// Sum all the columns
for _, c := range columns {
sfsp.sum += sumValues(c.Values)
}
return 0
}
// Sum the requested columns
for _, field := range sfsp.sfs.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
sfsp.sum += sumValues(columns[idx].Values)
}
}
return 0
}
func sumValues(values []string) float64 {
sum := float64(0)
f := float64(0)
for i, v := range values {
if i == 0 || values[i-1] != v {
f, _ = tryParseFloat64(v)
if math.IsNaN(f) {
// Ignore NaN values, since this is the expected behaviour by most users.
f = 0
}
}
sum += f
}
return sum
}
func (sfsp *statsFuncSumProcessor) updateStatsForRow(_ []int64, columns []BlockColumn, rowIdx int) int {
if sfsp.sfs.containsStar {
// Sum all the fields for the given row
for _, c := range columns {
v := c.Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
sfsp.sum += f
}
}
return 0
}
// Sum only the given fields for the given row
for _, field := range sfsp.sfs.fields {
if idx := getBlockColumnIndex(columns, field); idx >= 0 {
v := columns[idx].Values[rowIdx]
f, _ := tryParseFloat64(v)
if !math.IsNaN(f) {
sfsp.sum += f
}
}
}
return 0
}
func (sfsp *statsFuncSumProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsFuncSumProcessor)
sfsp.sum += src.sum
}
func (sfsp *statsFuncSumProcessor) finalizeStats() (string, string) {
value := strconv.FormatFloat(sfsp.sum, 'g', -1, 64)
return sfsp.sfs.resultName, value
}
func parseStatsFuncSum(lex *lexer) (*statsFuncSum, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'sum' args: %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'sum' must contain at least one arg")
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfs := &statsFuncSum{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfs, nil
}
type statsFuncUniq struct {
fields []string
containsStar bool
@ -942,24 +1085,6 @@ func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) {
return sfu, nil
}
func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
}
resultName, err := parseResultName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name: %w", err)
}
sfc := &statsFuncCount{
fields: fields,
containsStar: slices.Contains(fields, "*"),
resultName: resultName,
}
return sfc, nil
}
func parseResultName(lex *lexer) (string, error) {
if lex.isKeyword("as") {
if !lex.mustNextToken() {