From f26d593c7b8abdeb694f8dbf5ccfe7462ebf4c8f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 14 May 2024 22:11:51 +0200 Subject: [PATCH] wip --- docs/VictoriaLogs/LogsQL.md | 26 ++++ lib/logstorage/parser_test.go | 15 ++ lib/logstorage/pipe_stats.go | 6 + lib/logstorage/stats_quantile.go | 215 ++++++++++++++++++++++++++ lib/logstorage/stats_quantile_test.go | 55 +++++++ 5 files changed, 317 insertions(+) create mode 100644 lib/logstorage/stats_quantile.go create mode 100644 lib/logstorage/stats_quantile_test.go diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index a9af1e1d8..330183be6 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1372,6 +1372,7 @@ LogsQL supports the following functions for [`stats` pipe](#stats-pipe): - [`count_uniq`](#count_uniq-stats) calculates the number of unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`max`](#max-stats) calcualtes the maximum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`min`](#min-stats) calculates the minumum value over the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`quantile`](#quantile-stats) calculates the given quantile for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum`](#sum-stats) calculates the sum for the given numeric [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`sum_len`](#sum_len-stats) calculates the sum of lengths for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`uniq_values`](#uniq_values-stats) returns unique non-empty values for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1394,6 +1395,7 @@ See also: - [`min`](#min-stats) - [`max`](#max-stats) +- [`quantile`](#quantile-stats) - [`sum`](#sum-stats) - [`count`](#count-stats) @@ -1493,6 +1495,7 @@ _time:5m | stats max(duration) max_duration See also: - [`min`](#min-stats) +- [`quantile`](#quantile-stats) - [`avg`](#avg-stats) - [`sum`](#sum-stats) - [`count`](#count-stats) @@ -1513,10 +1516,33 @@ _time:5m | stats min(duration) min_duration See also: - [`max`](#max-stats) +- [`quantile`](#quantile-stats) - [`avg`](#avg-stats) - [`sum`](#sum-stats) - [`count`](#count-stats) +### quantile stats + +`quantile(phi, field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates `phi` [percentile](https://en.wikipedia.org/wiki/Percentile) over numeric values +for the given [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). The `phi` must be in the range `0 ... 1`, where `0` means `0th` percentile, +while `1` means `100th` percentile. + +For example, the following query calculates `50th`, `90th` and `99th` percentiles for the `request_duration_seconds` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +over logs for the last 5 minutes: + +```logsql +_time:5m | stats + quantile(0.5, request_duration_seconds) p50, + quantile(0.9, request_duration_seconds) p90, + quantile(0.99, request_duration_seconds) p99 +``` + +See also: + +- [`min`](#min-stats) +- [`max`](#max-stats) +- [`avg`](#avg-stats) + ### sum stats `sum(field1, ..., fieldN)` [stats pipe](#stats-pipe) calculates the sum of numeric values across diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index fe4dbd9c2..78046ec20 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -933,6 +933,13 @@ func TestParseQuerySuccess(t *testing.T) { f(`* | stats sum_len(*) x`, `* | stats sum_len(*) as x`) f(`* | stats sum_len(foo,*,bar) x`, `* | stats sum_len(*) as x`) + // stats pipe quantile + f(`* | stats quantile(0, foo) bar`, `* | stats quantile(0, foo) as bar`) + f(`* | stats quantile(1, foo) bar`, `* | stats quantile(1, foo) as bar`) + f(`* | stats quantile(0.5, a, b, c) bar`, `* | stats quantile(0.5, a, b, c) as bar`) + f(`* | stats quantile(0.99, *) bar`, `* | stats quantile(0.99, *) as bar`) + f(`* | stats quantile(0.99, a, *, b) bar`, `* | stats quantile(0.99, *) as bar`) + // stats pipe multiple funcs f(`* | stats count() "foo.bar:baz", count_uniq(a) bar`, `* | stats count(*) as "foo.bar:baz", count_uniq(a) as bar`) f(`* | stats by (x, y) count(*) foo, count_uniq(a,b) bar`, `* | stats by (x, y) count(*) as foo, count_uniq(a, b) as bar`) @@ -1286,6 +1293,14 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | stats sum_len`) f(`foo | stats sum_len()`) + // invalid stats quantile + f(`foo | stats quantile`) + f(`foo | stats quantile() foo`) + f(`foo | stats quantile(bar, baz) foo`) + f(`foo | stats quantile(0.5) foo`) + f(`foo | stats quantile(-1, x) foo`) + f(`foo | stats quantile(10, x) foo`) + // invalid stats grouping fields f(`foo | stats by(foo:bar) count() baz`) f(`foo | stats by(foo:/bar) count() baz`) diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index fcdd43a5a..bcb2a4388 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -540,6 +540,12 @@ func parseStatsFunc(lex *lexer) (statsFunc, string, error) { return nil, "", fmt.Errorf("cannot parse 'sum_len' func: %w", err) } sf = sss + case lex.isKeyword("quantile"): + sqs, err := parseStatsQuantile(lex) + if err != nil { + return nil, "", fmt.Errorf("cannot parse 'quantile' func: %w", err) + } + sf = sqs default: return nil, "", fmt.Errorf("unknown stats func %q", lex.token) } diff --git a/lib/logstorage/stats_quantile.go b/lib/logstorage/stats_quantile.go new file mode 100644 index 000000000..cbdcfc07c --- /dev/null +++ b/lib/logstorage/stats_quantile.go @@ -0,0 +1,215 @@ +package logstorage + +import ( + "fmt" + "math" + "slices" + "strconv" + "unsafe" + + "github.com/valyala/fastrand" +) + +type statsQuantile struct { + fields []string + containsStar bool + + phi float64 +} + +func (sq *statsQuantile) String() string { + return fmt.Sprintf("quantile(%g, %s)", sq.phi, fieldNamesString(sq.fields)) +} + +func (sq *statsQuantile) neededFields() []string { + return sq.fields +} + +func (sq *statsQuantile) newStatsProcessor() (statsProcessor, int) { + sqp := &statsQuantileProcessor{ + sq: sq, + } + return sqp, int(unsafe.Sizeof(*sqp)) +} + +type statsQuantileProcessor struct { + sq *statsQuantile + + h histogram +} + +func (sqp *statsQuantileProcessor) updateStatsForAllRows(br *blockResult) int { + h := &sqp.h + stateSizeIncrease := 0 + + if sqp.sq.containsStar { + for _, c := range br.getColumns() { + for _, v := range c.getValues(br) { + f, ok := tryParseFloat64(v) + if ok { + stateSizeIncrease += h.update(f) + } + } + } + } else { + for _, field := range sqp.sq.fields { + c := br.getColumnByName(field) + for _, v := range c.getValues(br) { + f, ok := tryParseFloat64(v) + if ok { + stateSizeIncrease += h.update(f) + } + } + } + } + + return stateSizeIncrease +} + +func (sqp *statsQuantileProcessor) updateStatsForRow(br *blockResult, rowIdx int) int { + h := &sqp.h + stateSizeIncrease := 0 + + if sqp.sq.containsStar { + for _, c := range br.getColumns() { + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + stateSizeIncrease += h.update(f) + } + } + } else { + for _, field := range sqp.sq.fields { + c := br.getColumnByName(field) + f := c.getFloatValueAtRow(rowIdx) + if !math.IsNaN(f) { + stateSizeIncrease += h.update(f) + } + } + } + + return stateSizeIncrease +} + +func (sqp *statsQuantileProcessor) mergeState(sfp statsProcessor) { + src := sfp.(*statsQuantileProcessor) + sqp.h.mergeState(&src.h) +} + +func (sqp *statsQuantileProcessor) finalizeStats() string { + q := sqp.h.quantile(sqp.sq.phi) + return strconv.FormatFloat(q, 'f', -1, 64) +} + +func parseStatsQuantile(lex *lexer) (*statsQuantile, error) { + if !lex.isKeyword("quantile") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "quantile") + } + lex.nextToken() + + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'quantile' args: %w", err) + } + if len(fields) < 2 { + return nil, fmt.Errorf("'quantile' must have at least two args: phi and field name") + } + + // Parse phi + phi, ok := tryParseFloat64(fields[0]) + if !ok { + return nil, fmt.Errorf("phi arg in 'quantile' must be floating point number; got %q", fields[0]) + } + if phi < 0 || phi > 1 { + return nil, fmt.Errorf("phi arg in 'quantile' must be in the range [0..1]; got %q", fields[0]) + } + + // Parse fields + fields = fields[1:] + if slices.Contains(fields, "*") { + fields = []string{"*"} + } + + sq := &statsQuantile{ + fields: fields, + containsStar: slices.Contains(fields, "*"), + + phi: phi, + } + return sq, nil +} + +type histogram struct { + a []float64 + min float64 + max float64 + count uint64 + + rng fastrand.RNG +} + +func (h *histogram) update(f float64) int { + if h.count == 0 || f < h.min { + h.min = f + } + if h.count == 0 || f > h.max { + h.max = f + } + + h.count++ + if len(h.a) < maxHistogramSamples { + h.a = append(h.a, f) + return int(unsafe.Sizeof(f)) + } + + if n := h.rng.Uint32n(uint32(h.count)); n < uint32(len(h.a)) { + h.a[n] = f + } + return 0 +} + +const maxHistogramSamples = 100_000 + +func (h *histogram) mergeState(src *histogram) { + if src.count == 0 { + // Nothing to merge + return + } + if h.count == 0 { + h.a = append(h.a, src.a...) + h.min = src.min + h.max = src.max + h.count = src.count + return + } + + h.a = append(h.a, src.a...) + if src.min < h.min { + h.min = src.min + } + if src.max > h.max { + h.max = src.max + } + h.count += src.count +} + +func (h *histogram) quantile(phi float64) float64 { + if len(h.a) == 0 { + return nan + } + if len(h.a) == 1 { + return h.a[0] + } + if phi <= 0 { + return h.min + } + if phi >= 1 { + return h.max + } + + slices.Sort(h.a) + idx := int(phi * float64(len(h.a))) + if idx == len(h.a) { + return h.max + } + return h.a[idx] +} diff --git a/lib/logstorage/stats_quantile_test.go b/lib/logstorage/stats_quantile_test.go new file mode 100644 index 000000000..f497258ad --- /dev/null +++ b/lib/logstorage/stats_quantile_test.go @@ -0,0 +1,55 @@ +package logstorage + +import ( + "math" + "testing" +) + +func TestHistogramQuantile(t *testing.T) { + f := func(a []float64, phi, qExpected float64) { + t.Helper() + + var h histogram + for _, f := range a { + h.update(f) + } + q := h.quantile(phi) + + if math.IsNaN(qExpected) { + if !math.IsNaN(q) { + t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected) + } + } else if q != qExpected { + t.Fatalf("unexpected result for q=%v, phi=%v; got %v; want %v", a, phi, q, qExpected) + } + } + + f(nil, -1, nan) + f(nil, 0, nan) + f(nil, 0.5, nan) + f(nil, 1, nan) + f(nil, 10, nan) + + f([]float64{123}, -1, 123) + f([]float64{123}, 0, 123) + f([]float64{123}, 0.5, 123) + f([]float64{123}, 1, 123) + f([]float64{123}, 10, 123) + + f([]float64{5, 1}, -1, 1) + f([]float64{5, 1}, 0, 1) + f([]float64{5, 1}, 0.5-1e-5, 1) + f([]float64{5, 1}, 0.5, 5) + f([]float64{5, 1}, 1, 5) + f([]float64{5, 1}, 10, 5) + + f([]float64{5, 1, 3}, -1, 1) + f([]float64{5, 1, 3}, 0, 1) + f([]float64{5, 1, 3}, 1.0/3-1e-5, 1) + f([]float64{5, 1, 3}, 1.0/3, 3) + f([]float64{5, 1, 3}, 2.0/3-1e-5, 3) + f([]float64{5, 1, 3}, 2.0/3, 5) + f([]float64{5, 1, 3}, 1-1e-5, 5) + f([]float64{5, 1, 3}, 1, 5) + f([]float64{5, 1, 3}, 10, 5) +}