This commit is contained in:
Aliaksandr Valialkin 2024-05-14 22:11:51 +02:00
parent 447a7f0bdf
commit f26d593c7b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
5 changed files with 317 additions and 0 deletions

View file

@ -1372,6 +1372,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe):
- [`count_uniq`](#count_uniq-stats) calculates the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`quantile`](#quantile-stats) calculates the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`sum_len`](#sum_len-stats) calculates the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
- [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@ -1394,6 +1395,7 @@ See also:
- [`min`](#min-stats)
- [`max`](#max-stats)
- [`quantile`](#quantile-stats)
- [`sum`](#sum-stats)
- [`count`](#count-stats)
@ -1493,6 +1495,7 @@ _time:5m | stats max(duration) max_duration
See also:
- [`min`](#min-stats)
- [`quantile`](#quantile-stats)
- [`avg`](#avg-stats)
- [`sum`](#sum-stats)
- [`count`](#count-stats)
@ -1513,10 +1516,33 @@ _time:5m | stats min(duration) min_duration
See also:
- [`max`](#max-stats)
- [`quantile`](#quantile-stats)
- [`avg`](#avg-stats)
- [`sum`](#sum-stats)
- [`count`](#count-stats)
### quantile stats
`quantile(phi, field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates `phi` [percentile](https://en.wikipedia.org/wiki/Percentile) over numeric values
for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). The `phi` must be in the range `0 ... 1`, where `0` means `0th` percentile,
while `1` means `100th` percentile.
For example, the following query calculates `50th`, `90th` and `99th` percentiles for the `request_duration_seconds` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes:
```logsql
_time:5m | stats
quantile(0.5, request_duration_seconds) p50,
quantile(0.9, request_duration_seconds) p90,
quantile(0.99, request_duration_seconds) p99
```
See also:
- [`min`](#min-stats)
- [`max`](#max-stats)
- [`avg`](#avg-stats)
### sum stats
`sum(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of numeric values across

View file

@ -933,6 +933,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`)
f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`)
// stats pipe quantile
f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`)
f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`)
f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`)
f(`* | stats quantile(0.99, *) bar`, `* | stats quantile(0.99, *) as bar`)
f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99, *) as bar`)
// stats pipe multiple funcs
f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`)
f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`)
@ -1286,6 +1293,14 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | stats sum_len`)
f(`foo | stats sum_len()`)
// invalid stats quantile
f(`foo | stats quantile`)
f(`foo | stats quantile() foo`)
f(`foo | stats quantile(bar, baz) foo`)
f(`foo | stats quantile(0.5) foo`)
f(`foo | stats quantile(-1, x) foo`)
f(`foo | stats quantile(10, x) foo`)
// invalid stats grouping fields
f(`foo | stats by(foo:bar) count() baz`)
f(`foo | stats by(foo:/bar) count() baz`)

View file

@ -540,6 +540,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) {
return nil, "", fmt.Errorf("cannot parse 'sum_len' func: %w", err)
}
sf = sss
case lex.isKeyword("quantile"):
sqs, err := parseStatsQuantile(lex)
if err != nil {
return nil, "", fmt.Errorf("cannot parse 'quantile' func: %w", err)
}
sf = sqs
default:
return nil, "", fmt.Errorf("unknown stats func %q", lex.token)
}

View file

@ -0,0 +1,215 @@
package logstorage
import (
"fmt"
"math"
"slices"
"strconv"
"unsafe"
"github.com/valyala/fastrand"
)
type statsQuantile struct {
fields []string
containsStar bool
phi float64
}
func (sq *statsQuantile) String() string {
return fmt.Sprintf("quantile(%g, %s)", sq.phi, fieldNamesString(sq.fields))
}
func (sq *statsQuantile) neededFields() []string {
return sq.fields
}
func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) {
sqp := &statsQuantileProcessor{
sq: sq,
}
return sqp, int(unsafe.Sizeof(*sqp))
}
type statsQuantileProcessor struct {
sq *statsQuantile
h histogram
}
func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int {
h := &sqp.h
stateSizeIncrease := 0
if sqp.sq.containsStar {
for _, c := range br.getColumns() {
for _, v := range c.getValues(br) {
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
}
} else {
for _, field := range sqp.sq.fields {
c := br.getColumnByName(field)
for _, v := range c.getValues(br) {
f, ok := tryParseFloat64(v)
if ok {
stateSizeIncrease += h.update(f)
}
}
}
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int) int {
h := &sqp.h
stateSizeIncrease := 0
if sqp.sq.containsStar {
for _, c := range br.getColumns() {
f := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) {
stateSizeIncrease += h.update(f)
}
}
} else {
for _, field := range sqp.sq.fields {
c := br.getColumnByName(field)
f := c.getFloatValueAtRow(rowIdx)
if !math.IsNaN(f) {
stateSizeIncrease += h.update(f)
}
}
}
return stateSizeIncrease
}
func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) {
src := sfp.(*statsQuantileProcessor)
sqp.h.mergeState(&src.h)
}
func (sqp *statsQuantileProcessor) finalizeStats() string {
q := sqp.h.quantile(sqp.sq.phi)
return strconv.FormatFloat(q, 'f', -1, 64)
}
func parseStatsQuantile(lex *lexer) (*statsQuantile, error) {
if !lex.isKeyword("quantile") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "quantile")
}
lex.nextToken()
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err)
}
if len(fields) < 2 {
return nil, fmt.Errorf("'quantile' must have at least two args: phi and field name")
}
// Parse phi
phi, ok := tryParseFloat64(fields[0])
if !ok {
return nil, fmt.Errorf("phi arg in 'quantile' must be floating point number; got %q", fields[0])
}
if phi < 0 || phi > 1 {
return nil, fmt.Errorf("phi arg in 'quantile' must be in the range [0..1]; got %q", fields[0])
}
// Parse fields
fields = fields[1:]
if slices.Contains(fields, "*") {
fields = []string{"*"}
}
sq := &statsQuantile{
fields: fields,
containsStar: slices.Contains(fields, "*"),
phi: phi,
}
return sq, nil
}
type histogram struct {
a []float64
min float64
max float64
count uint64
rng fastrand.RNG
}
func (h *histogram) update(f float64) int {
if h.count == 0 || f < h.min {
h.min = f
}
if h.count == 0 || f > h.max {
h.max = f
}
h.count++
if len(h.a) < maxHistogramSamples {
h.a = append(h.a, f)
return int(unsafe.Sizeof(f))
}
if n := h.rng.Uint32n(uint32(h.count)); n < uint32(len(h.a)) {
h.a[n] = f
}
return 0
}
const maxHistogramSamples = 100_000
func (h *histogram) mergeState(src *histogram) {
if src.count == 0 {
// Nothing to merge
return
}
if h.count == 0 {
h.a = append(h.a, src.a...)
h.min = src.min
h.max = src.max
h.count = src.count
return
}
h.a = append(h.a, src.a...)
if src.min < h.min {
h.min = src.min
}
if src.max > h.max {
h.max = src.max
}
h.count += src.count
}
func (h *histogram) quantile(phi float64) float64 {
if len(h.a) == 0 {
return nan
}
if len(h.a) == 1 {
return h.a[0]
}
if phi <= 0 {
return h.min
}
if phi >= 1 {
return h.max
}
slices.Sort(h.a)
idx := int(phi * float64(len(h.a)))
if idx == len(h.a) {
return h.max
}
return h.a[idx]
}

View file

@ -0,0 +1,55 @@
package logstorage
import (
"math"
"testing"
)
func TestHistogramQuantile(t *testing.T) {
f := func(a []float64, phi, qExpected float64) {
t.Helper()
var h histogram
for _, f := range a {
h.update(f)
}
q := h.quantile(phi)
if math.IsNaN(qExpected) {
if !math.IsNaN(q) {
t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected)
}
} else if q != qExpected {
t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected)
}
}
f(nil, -1, nan)
f(nil, 0, nan)
f(nil, 0.5, nan)
f(nil, 1, nan)
f(nil, 10, nan)
f([]float64{123}, -1, 123)
f([]float64{123}, 0, 123)
f([]float64{123}, 0.5, 123)
f([]float64{123}, 1, 123)
f([]float64{123}, 10, 123)
f([]float64{5, 1}, -1, 1)
f([]float64{5, 1}, 0, 1)
f([]float64{5, 1}, 0.5-1e-5, 1)
f([]float64{5, 1}, 0.5, 5)
f([]float64{5, 1}, 1, 5)
f([]float64{5, 1}, 10, 5)
f([]float64{5, 1, 3}, -1, 1)
f([]float64{5, 1, 3}, 0, 1)
f([]float64{5, 1, 3}, 1.0/3-1e-5, 1)
f([]float64{5, 1, 3}, 1.0/3, 3)
f([]float64{5, 1, 3}, 2.0/3-1e-5, 3)
f([]float64{5, 1, 3}, 2.0/3, 5)
f([]float64{5, 1, 3}, 1-1e-5, 5)
f([]float64{5, 1, 3}, 1, 5)
f([]float64{5, 1, 3}, 10, 5)
}