diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index f7c761b36..f51d6fae6 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1341,6 +1341,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`values`](#values-stats) returns all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). ### avg stats @@ -1390,6 +1391,7 @@ _time:5m | stats count(username, password) logs_with_username_or_password See also: - [`count_uniq`](#count_uniq-stats) +- [`count_empty`](#count_empty-stats) - [`sum`](#sum-stats) - [`avg`](#avg-stats) @@ -1525,9 +1527,29 @@ _time:5m | stats uniq_values(ip) limit 100 as unique_ips_100 See also: +- [`values`](#values-stats) - [`count_uniq`](#count_uniq-stats) - [`count`](#count-stats) +### values stats + +`values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns all the values (including empty values) +for the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +The returned values are encoded in JSON array. + +For example, the following query returns all the values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over logs for the last 5 minutes: + +```logsql +_time:5m | stats values(ip) ips +``` + +See also: + +- [`uniq_values`](#uniq_values-stats) +- [`count`](#count-stats) +- [`count_empty`](#count_empty-stats) + ## Stream context LogsQL will support the ability to select the given number of surrounding log lines for the selected log lines diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 5b535d750..68ac9a639 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -918,6 +918,14 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats by(x) uniq_values() limit 1_000 AS y`, `* | stats by (x) uniq_values(*) limit 1000 as y`) f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`) + // stats pipe values + f(`* | stats values(foo) bar`, `* | stats values(foo) as bar`) + f(`* | stats values(foo) limit 10 bar`, `* | stats values(foo) limit 10 as bar`) + f(`* | stats by(x, y) values(foo, bar) as baz`, `* | stats by (x, y) values(foo, bar) as baz`) + f(`* | stats by(x) values(*) y`, `* | stats by (x) values(*) as y`) + f(`* | stats by(x) values() limit 1_000 AS y`, `* | stats by (x) values(*) limit 1000 as y`) + f(`* | stats by(x) values(a,*,b) y`, `* | stats by (x) values(*) as y`) + // stats pipe multiple funcs f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) @@ -1250,6 +1258,14 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats uniq_values(a) limit 0.5`) f(`foo | stats uniq_values(a) limit -1`) + // invalid stats values + f(`foo | stats values`) + f(`foo | stats values()`) + f(`foo | stats values() limit`) + f(`foo | stats values(a) limit foo`) + f(`foo | stats values(a) limit 0.5`) + f(`foo | stats values(a) limit -1`) + // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index e1355c265..0f81f6cfb 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -522,6 +522,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'uniq_values' func: %w", err) } sf = sus + case lex.isKeyword("values"): + svs, err := parseStatsValues(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'values' func: %w", err) + } + sf = svs default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_uniq_values.go b/lib/logstorage/stats_uniq_values.go index e13991945..df0e561ae 100644 --- a/lib/logstorage/stats_uniq_values.go +++ b/lib/logstorage/stats_uniq_values.go @@ -213,8 +213,15 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string { items = items[:limit] } - // Marshal items into JSON array. + return marshalJSONArray(items) +} +func (sup *statsUniqValuesProcessor) limitReached() bool { + limit := sup.su.limit + return limit > 0 && uint64(len(sup.m)) >= limit +} + +func marshalJSONArray(items []string) string { // Pre-allocate buffer for serialized items. // Assume that there is no need in quoting items. Otherwise additional reallocations // for the allocated buffer are possible. @@ -235,11 +242,6 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string { return bytesutil.ToUnsafeString(b) } -func (sup *statsUniqValuesProcessor) limitReached() bool { - limit := sup.su.limit - return limit > 0 && uint64(len(sup.m)) >= limit -} - func compareValues(a, b string) int { fA, okA := tryParseFloat64(a) fB, okB := tryParseFloat64(b) diff --git a/lib/logstorage/stats_values.go b/lib/logstorage/stats_values.go new file mode 100644 index 000000000..2dd62f2a9 --- /dev/null +++ b/lib/logstorage/stats_values.go @@ -0,0 +1,208 @@ +package logstorage + +import ( + "fmt" + "slices" + "strings" + "unsafe" +) + +type statsValues struct { + fields []string + containsStar bool + limit uint64 +} + +func (sv *statsValues) String() string { + s := "values(" + fieldNamesString(sv.fields) + ")" + if sv.limit > 0 { + s += fmt.Sprintf(" limit %d", sv.limit) + } + return s +} + +func (sv *statsValues) neededFields() []string { + return sv.fields +} + +func (sv *statsValues) newStatsProcessor() (statsProcessor, int) { + svp := &statsValuesProcessor{ + sv: sv, + } + return svp, int(unsafe.Sizeof(*svp)) +} + +type statsValuesProcessor struct { + sv *statsValues + + values []string +} + +func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int { + if svp.limitReached() { + // Limit on the number of unique values has been reached + return 0 + } + + stateSizeIncrease := 0 + if svp.sv.containsStar { + for _, c := range br.getColumns() { + stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) + } + } else { + for _, field := range svp.sv.fields { + c := br.getColumnByName(field) + stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br) + } + } + return stateSizeIncrease +} + +func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int { + stateSizeIncrease := 0 + if c.isConst { + v := strings.Clone(c.encodedValues[0]) + stateSizeIncrease += len(v) + + values := svp.values + for range br.timestamps { + values = append(values, v) + } + svp.values = values + + stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + return stateSizeIncrease + } + if c.valueType == valueTypeDict { + dictValues := make([]string, len(c.dictValues)) + for i, v := range c.dictValues { + dictValues[i] = strings.Clone(v) + stateSizeIncrease += len(v) + } + + values := svp.values + for _, encodedValue := range c.encodedValues { + idx := encodedValue[0] + values = append(values, dictValues[idx]) + } + svp.values = values + + stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + return stateSizeIncrease + } + + values := svp.values + for _, v := range c.getValues(br) { + if len(values) == 0 || values[len(values)-1] != v { + v = strings.Clone(v) + stateSizeIncrease += len(v) + } + values = append(values, v) + } + svp.values = values + + stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0])) + return stateSizeIncrease +} + +func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + if svp.limitReached() { + // Limit on the number of unique values has been reached + return 0 + } + + stateSizeIncrease := 0 + if svp.sv.containsStar { + for _, c := range br.getColumns() { + stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) + } + } else { + for _, field := range svp.sv.fields { + c := br.getColumnByName(field) + stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx) + } + } + return stateSizeIncrease +} + +func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int { + stateSizeIncrease := 0 + if c.isConst { + v := strings.Clone(c.encodedValues[0]) + stateSizeIncrease += len(v) + + svp.values = append(svp.values, v) + stateSizeIncrease += int(unsafe.Sizeof(svp.values[0])) + + return stateSizeIncrease + } + if c.valueType == valueTypeDict { + // collect unique non-zero c.dictValues + dictIdx := c.encodedValues[rowIdx][0] + v := strings.Clone(c.dictValues[dictIdx]) + stateSizeIncrease += len(v) + + svp.values = append(svp.values, v) + stateSizeIncrease += int(unsafe.Sizeof(svp.values[0])) + + return stateSizeIncrease + } + + // collect unique values for the given rowIdx. + v := c.getValueAtRow(br, rowIdx) + v = strings.Clone(v) + stateSizeIncrease += len(v) + + svp.values = append(svp.values, v) + stateSizeIncrease += int(unsafe.Sizeof(svp.values[0])) + + return stateSizeIncrease +} + +func (svp *statsValuesProcessor) mergeState(sfp statsProcessor) { + if svp.limitReached() { + return + } + + src := sfp.(*statsValuesProcessor) + svp.values = append(svp.values, src.values...) +} + +func (svp *statsValuesProcessor) finalizeStats() string { + items := svp.values + if len(items) == 0 { + return "[]" + } + + if limit := svp.sv.limit; limit > 0 && uint64(len(items)) > limit { + items = items[:limit] + } + + return marshalJSONArray(items) +} + +func (svp *statsValuesProcessor) limitReached() bool { + limit := svp.sv.limit + return limit > 0 && uint64(len(svp.values)) >= limit +} + +func parseStatsValues(lex *lexer) (*statsValues, error) { + fields, err := parseFieldNamesForStatsFunc(lex, "values") + if err != nil { + return nil, err + } + sv := &statsValues{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + } + if lex.isKeyword("limit") { + lex.nextToken() + n, ok := tryParseUint64(lex.token) + if !ok { + return nil, fmt.Errorf("cannot parse 'limit %s' for 'values': %w", lex.token, err) + } + lex.nextToken() + sv.limit = n + } + return sv, nil +}