diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 985bf1ebf..be7cbeb18 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1059,18 +1059,33 @@ LogsQL supports calculating the following stats: - `error | stats by (datacenter, namespace) count(trace_id, user_id) as errors_with_trace_and_user` returns the number of log messages containing the `error` [word](#word), which contain non-empty `trace_id` or `user_id` [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `datacenter` and `namespace` fields. +- The number of unique values for the given set of [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Examples: + - `error | stats uniq(client_ip) as unique_user_ips` returns the number of unique values for `client_ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across log messages with the `error` [word](#word). + - `error | stats by (app) uniq(path, host) as unique_path_hosts` - returns the number of unique pairs of `path` and `host` [field values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + across log messages with the `error` [word](#word), grouped by `app` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +Stats' calculation can be combined in a single query. For example, the following query calculates the number of log messages with the `error` [word](#word), +the number of unique values for `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) and the number of unique values +for `path` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), grouped by `namespace` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model): + +```logsql +error | stats by (namespace) + count() as errors_total, + uniq(ip) as unique_ips, + uniq(path) as unique_paths +``` + LogsQL will support calculating the following additional stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and fields created by [transformations](#transformations): -- The number of unique values for the given field. - The min, max, avg, and sum for the given field. - The median and [percentile](https://en.wikipedia.org/wiki/Percentile) for the given field. It will be possible specifying an optional condition [filter](#post-filters) when calculating the stats. For example, `sumIf(response_size, is_admin:true)` calculates the total response size for admins only. -It will be possible to group stats by the specified [fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) -and by the specified time buckets. +It will be possible to group stats by the specified time buckets. It is possible to perform stats calculations on the [selected log entries](#filters) at client side with `sort`, `uniq`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line). diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index b3bca1482..d4b04f5c1 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -828,9 +828,17 @@ func TestParseQuerySuccess(t *testing.T) { // stats count pipe f(`* | Stats count() AS foo`, `* | stats count() as foo`) - f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) + f(`* | STATS bY (foo, b.a/r, "b az") count(*) XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`) f(`* | stats by() COUNT(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) + // stats uniq pipe + f(`* | stats uniq(foo) bar`, `* | stats uniq(foo) as bar`) + f(`* | stats by(x, y) uniq(foo,bar) as baz`, `* | stats by (x, y) uniq(foo, bar) as baz`) + + // stats pipe multiple funcs + f(`* | stats count() "foo.bar:baz", uniq(a) bar`, `* | stats count() as "foo.bar:baz", uniq(a) as bar`) + f(`* | stats by (x, y) count(*) foo, uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, uniq(a, b) as bar`) + // multiple different pipes f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`) f(`* | skip 100 | head 20 | skip 10`, `* | skip 100 | head 20 | skip 10`) @@ -1066,16 +1074,19 @@ func TestParseQueryFailure(t *testing.T) { // invalid stats f(`foo | stats bar`) - // invalid count + // invalid stats count f(`foo | stats count`) f(`foo | stats count(`) f(`foo | stats count bar`) f(`foo | stats count(bar`) f(`foo | stats count(bar)`) - f(`foo | stats count() bar`) f(`foo | stats count() as`) f(`foo | stats count() as |`) + // invalid stats uniq + f(`foo | stats uniq`) + f(`foo | stats uniq()`) + // invalid by clause f(`foo | stats by`) f(`foo | stats by bar`) diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 3de5f4d0c..421fb96c7 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -307,12 +307,8 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col // Slow path - update per-row stats // Pre-calculate column indexes for byFields in order to speed up building group key in the loop below. - columnIdxs := shard.columnIdxs[:0] - for _, f := range spp.sp.byFields { - idx := getBlockColumnIndex(columns, f) - columnIdxs = append(columnIdxs, idx) - } - shard.columnIdxs = columnIdxs + shard.columnIdxs = appendBlockColumnIndexes(shard.columnIdxs[:0], columns, spp.sp.byFields) + columnIdxs := shard.columnIdxs keyBuf := shard.keyBuf for i := range timestamps { @@ -484,6 +480,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, error) { return nil, fmt.Errorf("cannot parse 'count' func: %w", err) } return sfc, nil + case lex.isKeyword("uniq"): + sfu, err := parseStatsFuncUniq(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' func: %w", err) + } + return sfu, nil default: return nil, fmt.Errorf("unknown stats func %q", lex.token) } @@ -498,18 +500,19 @@ func (sfc *statsFuncCount) String() string { return "count(" + fieldNamesString(sfc.fields) + ") as " + quoteTokenIfNeeded(sfc.resultName) } +func (sfc *statsFuncCount) neededFields() []string { + return getFieldsIgnoreStar(sfc.fields) +} + func (sfc *statsFuncCount) newStatsFuncProcessor() statsFuncProcessor { return &statsFuncCountProcessor{ sfc: sfc, } } -func (sfc *statsFuncCount) neededFields() []string { - return getFieldsIgnoreStar(sfc.fields) -} - type statsFuncCountProcessor struct { - sfc *statsFuncCount + sfc *statsFuncCount + rowsCount uint64 } @@ -569,24 +572,152 @@ func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) { return sfcp.sfc.resultName, value } +type statsFuncUniq struct { + fields []string + resultName string +} + +func (sfu *statsFuncUniq) String() string { + return "uniq(" + fieldNamesString(sfu.fields) + ") as " + quoteTokenIfNeeded(sfu.resultName) +} + +func (sfu *statsFuncUniq) neededFields() []string { + return sfu.fields +} + +func (sfu *statsFuncUniq) newStatsFuncProcessor() statsFuncProcessor { + return &statsFuncUniqProcessor{ + sfu: sfu, + + m: make(map[string]struct{}), + } +} + +type statsFuncUniqProcessor struct { + sfu *statsFuncUniq + + m map[string]struct{} + + columnIdxs []int + keyBuf []byte +} + +func (sfup *statsFuncUniqProcessor) updateStatsForAllRows(timestamps []int64, columns []BlockColumn) { + fields := sfup.sfu.fields + m := sfup.m + + if len(fields) == 1 { + // Fast path for a single column + if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { + for _, v := range columns[idx].Values { + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + } + } + } + return + } + + // Slow path for multiple columns. + + // Pre-calculate column indexes for byFields in order to speed up building group key in the loop below. + sfup.columnIdxs = appendBlockColumnIndexes(sfup.columnIdxs[:0], columns, fields) + columnIdxs := sfup.columnIdxs + + keyBuf := sfup.keyBuf + for i := range timestamps { + keyBuf = keyBuf[:0] + for _, idx := range columnIdxs { + v := "" + if idx >= 0 { + v = columns[idx].Values[i] + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + } + } + sfup.keyBuf = keyBuf +} + +func (sfup *statsFuncUniqProcessor) updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIdx int) { + fields := sfup.sfu.fields + m := sfup.m + + if len(fields) == 1 { + // Fast path for a single column + if idx := getBlockColumnIndex(columns, fields[0]); idx >= 0 { + v := columns[idx].Values[rowIdx] + if _, ok := m[v]; !ok { + vCopy := strings.Clone(v) + m[vCopy] = struct{}{} + } + } + return + } + + // Slow path for multiple columns. + keyBuf := sfup.keyBuf + for _, f := range fields { + v := "" + if idx := getBlockColumnIndex(columns, f); idx >= 0 { + v = columns[idx].Values[rowIdx] + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + sfup.keyBuf = keyBuf + + if _, ok := m[string(keyBuf)]; !ok { + m[string(keyBuf)] = struct{}{} + } +} + +func (sfup *statsFuncUniqProcessor) mergeState(sfp statsFuncProcessor) { + src := sfp.(*statsFuncUniqProcessor) + m := sfup.m + for k := range src.m { + m[k] = struct{}{} + } +} + +func (sfup *statsFuncUniqProcessor) finalizeStats() (string, string) { + n := uint64(len(sfup.m)) + value := strconv.FormatUint(n, 10) + return sfup.sfu.resultName, value +} + +func parseStatsFuncUniq(lex *lexer) (*statsFuncUniq, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'uniq' args: %w", err) + } + if len(fields) == 0 { + return nil, fmt.Errorf("'uniq' must contain at least a single arg") + } + resultName, err := parseResultName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name: %w", err) + } + sfu := &statsFuncUniq{ + fields: fields, + resultName: resultName, + } + return sfu, nil +} + func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { lex.nextToken() fields, err := parseFieldNamesInParens(lex) if err != nil { return nil, fmt.Errorf("cannot parse 'count' args: %w", err) } - - if !lex.isKeyword("as") { - return nil, fmt.Errorf("missing 'as' keyword") - } - if !lex.mustNextToken() { - return nil, fmt.Errorf("missing token after 'as' keyword") - } - resultName, err := parseFieldName(lex) + resultName, err := parseResultName(lex) if err != nil { - return nil, fmt.Errorf("cannot parse 'as' field name: %w", err) + return nil, fmt.Errorf("cannot parse result name: %w", err) } - sfc := &statsFuncCount{ fields: fields, resultName: resultName, @@ -594,6 +725,19 @@ func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { return sfc, nil } +func parseResultName(lex *lexer) (string, error) { + if lex.isKeyword("as") { + if !lex.mustNextToken() { + return "", fmt.Errorf("missing token after 'as' keyword") + } + } + resultName, err := parseFieldName(lex) + if err != nil { + return "", fmt.Errorf("cannot parse 'as' field name: %w", err) + } + return resultName, nil +} + type headPipe struct { n uint64 } @@ -795,3 +939,11 @@ func getFieldsIgnoreStar(fields []string) []string { } return result } + +func appendBlockColumnIndexes(dst []int, columns []BlockColumn, fields []string) []int { + for _, f := range fields { + idx := getBlockColumnIndex(columns, f) + dst = append(dst, idx) + } + return dst +}