This commit is contained in:
Aliaksandr Valialkin 2024-05-11 05:28:36 +02:00
parent d8026dad3c
commit ee30c1f81b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 260 additions and 6 deletions

View file

@ -1341,6 +1341,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`values`](#values-stats) returns all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
### avg stats ### avg stats
@ -1390,6 +1391,7 @@ _time:5m | stats count(username, password) logs_with_username_or_password
See also: See also:
- [`count_uniq`](#count_uniq-stats) - [`count_uniq`](#count_uniq-stats)
- [`count_empty`](#count_empty-stats)
- [`sum`](#sum-stats) - [`sum`](#sum-stats)
- [`avg`](#avg-stats) - [`avg`](#avg-stats)
@ -1525,9 +1527,29 @@ _time:5m | stats uniq_values(ip) limit 100 as unique_ips_100
See also: See also:
- [`values`](#values-stats)
- [`count_uniq`](#count_uniq-stats) - [`count_uniq`](#count_uniq-stats)
- [`count`](#count-stats) - [`count`](#count-stats)
### values stats
`values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns all the values (including empty values)
for the mentioned [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
The returned values are encoded in JSON array.
For example, the following query returns all the values for the `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes:
```logsql
_time:5m | stats values(ip) ips
```
See also:
- [`uniq_values`](#uniq_values-stats)
- [`count`](#count-stats)
- [`count_empty`](#count_empty-stats)
## Stream context ## Stream context
LogsQL will support the ability to select the given number of surrounding log lines for the selected log lines LogsQL will support the ability to select the given number of surrounding log lines for the selected log lines

View file

@ -918,6 +918,14 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats by(x) uniq_values() limit 1_000 AS y`, `* | stats by (x) uniq_values(*) limit 1000 as y`) f(`* | stats by(x) uniq_values() limit 1_000 AS y`, `* | stats by (x) uniq_values(*) limit 1000 as y`)
f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`) f(`* | stats by(x) uniq_values(a,*,b) y`, `* | stats by (x) uniq_values(*) as y`)
// stats pipe values
f(`* | stats values(foo) bar`, `* | stats values(foo) as bar`)
f(`* | stats values(foo) limit 10 bar`, `* | stats values(foo) limit 10 as bar`)
f(`* | stats by(x, y) values(foo, bar) as baz`, `* | stats by (x, y) values(foo, bar) as baz`)
f(`* | stats by(x) values(*) y`, `* | stats by (x) values(*) as y`)
f(`* | stats by(x) values() limit 1_000 AS y`, `* | stats by (x) values(*) limit 1000 as y`)
f(`* | stats by(x) values(a,*,b) y`, `* | stats by (x) values(*) as y`)
// stats pipe multiple funcs // stats pipe multiple funcs
f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`) f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`)
f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`)
@ -1250,6 +1258,14 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats uniq_values(a) limit 0.5`) f(`foo | stats uniq_values(a) limit 0.5`)
f(`foo | stats uniq_values(a) limit -1`) f(`foo | stats uniq_values(a) limit -1`)
// invalid stats values
f(`foo | stats values`)
f(`foo | stats values()`)
f(`foo | stats values() limit`)
f(`foo | stats values(a) limit foo`)
f(`foo | stats values(a) limit 0.5`)
f(`foo | stats values(a) limit -1`)
// invalid stats grouping fields // invalid stats grouping fields
f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:bar) count() baz`)
f(`foo | stats by(foo:/bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`)

View file

@ -522,6 +522,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
return nil, "", fmt.Errorf("cannot parse 'uniq_values' func: %w", err) return nil, "", fmt.Errorf("cannot parse 'uniq_values' func: %w", err)
} }
sf = sus sf = sus
case lex.isKeyword("values"):
svs, err := parseStatsValues(lex)
if err != nil {
return nil, "", fmt.Errorf("cannot parse 'values' func: %w", err)
}
sf = svs
default: default:
return nil, "", fmt.Errorf("unknown stats func %q", lex.token) return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
} }

View file

@ -213,8 +213,15 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string {
items = items[:limit] items = items[:limit]
} }
// Marshal items into JSON array. return marshalJSONArray(items)
}
func (sup *statsUniqValuesProcessor) limitReached() bool {
limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit
}
func marshalJSONArray(items []string) string {
// Pre-allocate buffer for serialized items. // Pre-allocate buffer for serialized items.
// Assume that there is no need in quoting items. Otherwise additional reallocations // Assume that there is no need in quoting items. Otherwise additional reallocations
// for the allocated buffer are possible. // for the allocated buffer are possible.
@ -235,11 +242,6 @@ func (sup *statsUniqValuesProcessor) finalizeStats() string {
return bytesutil.ToUnsafeString(b) return bytesutil.ToUnsafeString(b)
} }
func (sup *statsUniqValuesProcessor) limitReached() bool {
limit := sup.su.limit
return limit > 0 && uint64(len(sup.m)) >= limit
}
func compareValues(a, b string) int { func compareValues(a, b string) int {
fA, okA := tryParseFloat64(a) fA, okA := tryParseFloat64(a)
fB, okB := tryParseFloat64(b) fB, okB := tryParseFloat64(b)

View file

@ -0,0 +1,208 @@
package logstorage
import (
"fmt"
"slices"
"strings"
"unsafe"
)
type statsValues struct {
fields []string
containsStar bool
limit uint64
}
func (sv *statsValues) String() string {
s := "values(" + fieldNamesString(sv.fields) + ")"
if sv.limit > 0 {
s += fmt.Sprintf(" limit %d", sv.limit)
}
return s
}
func (sv *statsValues) neededFields() []string {
return sv.fields
}
func (sv *statsValues) newStatsProcessor() (statsProcessor, int) {
svp := &statsValuesProcessor{
sv: sv,
}
return svp, int(unsafe.Sizeof(*svp))
}
type statsValuesProcessor struct {
sv *statsValues
values []string
}
func (svp *statsValuesProcessor) updateStatsForAllRows(br *blockResult) int {
if svp.limitReached() {
// Limit on the number of unique values has been reached
return 0
}
stateSizeIncrease := 0
if svp.sv.containsStar {
for _, c := range br.getColumns() {
stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br)
}
} else {
for _, field := range svp.sv.fields {
c := br.getColumnByName(field)
stateSizeIncrease += svp.updateStatsForAllRowsColumn(c, br)
}
}
return stateSizeIncrease
}
func (svp *statsValuesProcessor) updateStatsForAllRowsColumn(c *blockResultColumn, br *blockResult) int {
stateSizeIncrease := 0
if c.isConst {
v := strings.Clone(c.encodedValues[0])
stateSizeIncrease += len(v)
values := svp.values
for range br.timestamps {
values = append(values, v)
}
svp.values = values
stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0]))
return stateSizeIncrease
}
if c.valueType == valueTypeDict {
dictValues := make([]string, len(c.dictValues))
for i, v := range c.dictValues {
dictValues[i] = strings.Clone(v)
stateSizeIncrease += len(v)
}
values := svp.values
for _, encodedValue := range c.encodedValues {
idx := encodedValue[0]
values = append(values, dictValues[idx])
}
svp.values = values
stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0]))
return stateSizeIncrease
}
values := svp.values
for _, v := range c.getValues(br) {
if len(values) == 0 || values[len(values)-1] != v {
v = strings.Clone(v)
stateSizeIncrease += len(v)
}
values = append(values, v)
}
svp.values = values
stateSizeIncrease += len(br.timestamps) * int(unsafe.Sizeof(values[0]))
return stateSizeIncrease
}
func (svp *statsValuesProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if svp.limitReached() {
// Limit on the number of unique values has been reached
return 0
}
stateSizeIncrease := 0
if svp.sv.containsStar {
for _, c := range br.getColumns() {
stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx)
}
} else {
for _, field := range svp.sv.fields {
c := br.getColumnByName(field)
stateSizeIncrease += svp.updateStatsForRowColumn(c, br, rowIdx)
}
}
return stateSizeIncrease
}
func (svp *statsValuesProcessor) updateStatsForRowColumn(c *blockResultColumn, br *blockResult, rowIdx int) int {
stateSizeIncrease := 0
if c.isConst {
v := strings.Clone(c.encodedValues[0])
stateSizeIncrease += len(v)
svp.values = append(svp.values, v)
stateSizeIncrease += int(unsafe.Sizeof(svp.values[0]))
return stateSizeIncrease
}
if c.valueType == valueTypeDict {
// collect unique non-zero c.dictValues
dictIdx := c.encodedValues[rowIdx][0]
v := strings.Clone(c.dictValues[dictIdx])
stateSizeIncrease += len(v)
svp.values = append(svp.values, v)
stateSizeIncrease += int(unsafe.Sizeof(svp.values[0]))
return stateSizeIncrease
}
// collect unique values for the given rowIdx.
v := c.getValueAtRow(br, rowIdx)
v = strings.Clone(v)
stateSizeIncrease += len(v)
svp.values = append(svp.values, v)
stateSizeIncrease += int(unsafe.Sizeof(svp.values[0]))
return stateSizeIncrease
}
func (svp *statsValuesProcessor) mergeState(sfp statsProcessor) {
if svp.limitReached() {
return
}
src := sfp.(*statsValuesProcessor)
svp.values = append(svp.values, src.values...)
}
func (svp *statsValuesProcessor) finalizeStats() string {
items := svp.values
if len(items) == 0 {
return "[]"
}
if limit := svp.sv.limit; limit > 0 && uint64(len(items)) > limit {
items = items[:limit]
}
return marshalJSONArray(items)
}
func (svp *statsValuesProcessor) limitReached() bool {
limit := svp.sv.limit
return limit > 0 && uint64(len(svp.values)) >= limit
}
func parseStatsValues(lex *lexer) (*statsValues, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "values")
if err != nil {
return nil, err
}
sv := &statsValues{
fields: fields,
containsStar: slices.Contains(fields, "*"),
}
if lex.isKeyword("limit") {
lex.nextToken()
n, ok := tryParseUint64(lex.token)
if !ok {
return nil, fmt.Errorf("cannot parse 'limit %s' for 'values': %w", lex.token, err)
}
lex.nextToken()
sv.limit = n
}
return sv, nil
}