This commit is contained in:
Aliaksandr Valialkin 2024-05-14 19:35:08 +02:00
parent 388b608a6e
commit 447a7f0bdf
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 172 additions and 0 deletions

View file

@ -1373,6 +1373,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum_len`](#sum_len-stats) calculates the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`values`](#values-stats) returns all the values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1535,6 +1536,22 @@ See also:
- [`max`](#max-stats)
- [`min`](#min-stats)
### sum_len stats
`sum_len(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of lengths of all the values
for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
For example, the following query returns the sum of lengths of [`_msg` fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
across all the logs for the last 5 minutes:
```logsql
_time:5m | stats sum_len(_msg) messages_len
```
See also:
- [`count`](#count-stats)
### uniq_values stats
`uniq_values(field1, ..., fieldN)` [stats pipe](#stats-pipe) returns the unique non-empty values across

View file

@ -1738,6 +1738,55 @@ func (c *blockResultColumn) getMinValue() float64 {
}
}
func (c *blockResultColumn) sumLenValues(br *blockResult) uint64 {
if c.isConst {
v := c.encodedValues[0]
return uint64(len(v)) * uint64(len(br.timestamps))
}
if c.isTime {
return uint64(len(time.RFC3339Nano)) * uint64(len(br.timestamps))
}
switch c.valueType {
case valueTypeString:
return c.sumLenStringValues(br)
case valueTypeDict:
n := uint64(0)
dictValues := c.dictValues
for _, v := range c.encodedValues {
idx := v[0]
v := dictValues[idx]
n += uint64(len(v))
}
return n
case valueTypeUint8:
return c.sumLenStringValues(br)
case valueTypeUint16:
return c.sumLenStringValues(br)
case valueTypeUint32:
return c.sumLenStringValues(br)
case valueTypeUint64:
return c.sumLenStringValues(br)
case valueTypeFloat64:
return c.sumLenStringValues(br)
case valueTypeIPv4:
return c.sumLenStringValues(br)
case valueTypeTimestampISO8601:
return uint64(len(iso8601Timestamp)) * uint64(len(br.timestamps))
default:
logger.Panicf("BUG: unknown valueType=%d", c.valueType)
return 0
}
}
func (c *blockResultColumn) sumLenStringValues(br *blockResult) uint64 {
n := uint64(0)
for _, v := range c.getValues(br) {
n += uint64(len(v))
}
return n
}
func (c *blockResultColumn) sumValues(br *blockResult) (float64, int) {
if c.isConst {
v := c.encodedValues[0]

View file

@ -926,6 +926,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats by(x) values() limit 1_000 AS y`, `* | stats by (x) values(*) limit 1000 as y`)
f(`* | stats by(x) values(a,*,b) y`, `* | stats by (x) values(*) as y`)
// stats pipe sum_len
f(`* | stats Sum_len(foo) bar`, `* | stats sum_len(foo) as bar`)
f(`* | stats BY(x, y, ) SUM_Len(foo,bar,) bar`, `* | stats by (x, y) sum_len(foo, bar) as bar`)
f(`* | stats sum_len() x`, `* | stats sum_len(*) as x`)
f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`)
f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`)
// stats pipe multiple funcs
f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`)
f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`)
@ -1275,6 +1282,10 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats values(a) limit 0.5`)
f(`foo | stats values(a) limit -1`)
// invalid stats sum_len
f(`foo | stats sum_len`)
f(`foo | stats sum_len()`)
// invalid stats grouping fields
f(`foo | stats by(foo:bar) count() baz`)
f(`foo | stats by(foo:/bar) count() baz`)

View file

@ -534,6 +534,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
return nil, "", fmt.Errorf("cannot parse 'values' func: %w", err)
}
sf = svs
case lex.isKeyword("sum_len"):
sss, err := parseStatsSumLen(lex)
if err != nil {
return nil, "", fmt.Errorf("cannot parse 'sum_len' func: %w", err)
}
sf = sss
default:
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
}

View file

@ -0,0 +1,89 @@
package logstorage
import (
"slices"
"strconv"
"unsafe"
)
type statsSumLen struct {
fields []string
containsStar bool
}
func (ss *statsSumLen) String() string {
return "sum_len(" + fieldNamesString(ss.fields) + ")"
}
func (ss *statsSumLen) neededFields() []string {
return ss.fields
}
func (ss *statsSumLen) newStatsProcessor() (statsProcessor, int) {
ssp := &statsSumLenProcessor{
ss: ss,
sumLen: 0,
}
return ssp, int(unsafe.Sizeof(*ssp))
}
type statsSumLenProcessor struct {
ss *statsSumLen
sumLen uint64
}
func (ssp *statsSumLenProcessor) updateStatsForAllRows(br *blockResult) int {
if ssp.ss.containsStar {
// Sum all the columns
for _, c := range br.getColumns() {
ssp.sumLen += c.sumLenValues(br)
}
} else {
// Sum the requested columns
for _, field := range ssp.ss.fields {
c := br.getColumnByName(field)
ssp.sumLen += c.sumLenValues(br)
}
}
return 0
}
func (ssp *statsSumLenProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
if ssp.ss.containsStar {
// Sum all the fields for the given row
for _, c := range br.getColumns() {
v := c.getValueAtRow(br, rowIdx)
ssp.sumLen += uint64(len(v))
}
} else {
// Sum only the given fields for the given row
for _, field := range ssp.ss.fields {
c := br.getColumnByName(field)
v := c.getValueAtRow(br, rowIdx)
ssp.sumLen += uint64(len(v))
}
}
return 0
}
func (ssp *statsSumLenProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsSumLenProcessor)
ssp.sumLen += src.sumLen
}
func (ssp *statsSumLenProcessor) finalizeStats() string {
return strconv.FormatUint(ssp.sumLen, 10)
}
func parseStatsSumLen(lex *lexer) (*statsSumLen, error) {
fields, err := parseFieldNamesForStatsFunc(lex, "sum_len")
if err != nil {
return nil, err
}
ss := &statsSumLen{
fields: fields,
containsStar: slices.Contains(fields, "*"),
}
return ss, nil
}