This commit is contained in:
Aliaksandr Valialkin 2024-05-18 23:07:51 +02:00
parent b038dee7b1
commit ce34e93874
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 203 additions and 30 deletions

View file

@ -0,0 +1,17 @@
{% stripspace %}
// FieldNamesResponse formats /select/logsql/field_names response
{% func FieldNamesResponse(names []string) %}
{
"names":[
{% if len(names) > 0 %}
{%q= names[0] %}
{% for _, v := range names[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,69 @@
// Code generated by qtc from "field_names_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// FieldNamesResponse formats /select/logsql/field_names response
//line app/vlselect/logsql/field_names_response.qtpl:4
package logsql
//line app/vlselect/logsql/field_names_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/field_names_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/field_names_response.qtpl:4
func StreamFieldNamesResponse(qw422016 *qt422016.Writer, names []string) {
//line app/vlselect/logsql/field_names_response.qtpl:4
qw422016.N().S(`{"names":[`)
//line app/vlselect/logsql/field_names_response.qtpl:7
if len(names) > 0 {
//line app/vlselect/logsql/field_names_response.qtpl:8
qw422016.N().Q(names[0])
//line app/vlselect/logsql/field_names_response.qtpl:9
for _, v := range names[1:] {
//line app/vlselect/logsql/field_names_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/field_names_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/field_names_response.qtpl:11
}
//line app/vlselect/logsql/field_names_response.qtpl:12
}
//line app/vlselect/logsql/field_names_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/field_names_response.qtpl:15
}
//line app/vlselect/logsql/field_names_response.qtpl:15
func WriteFieldNamesResponse(qq422016 qtio422016.Writer, names []string) {
//line app/vlselect/logsql/field_names_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
StreamFieldNamesResponse(qw422016, names)
//line app/vlselect/logsql/field_names_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
}
//line app/vlselect/logsql/field_names_response.qtpl:15
func FieldNamesResponse(names []string) string {
//line app/vlselect/logsql/field_names_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/field_names_response.qtpl:15
WriteFieldNamesResponse(qb422016, names)
//line app/vlselect/logsql/field_names_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/field_names_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
return qs422016
//line app/vlselect/logsql/field_names_response.qtpl:15
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"net/http"
"slices"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
@ -15,6 +16,29 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
)
// ProcessFieldNamesRequest handles /select/logsql/field_names request.
func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Obtain field names for the given query
q.Optimize()
fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain field names: %w", err)
return
}
slices.Sort(fieldNames)
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldNamesResponse(w, fieldNames)
}
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
@ -42,12 +66,19 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
// Obtain unique values for the given field
q.Optimize()
values, err := vlstorage.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit))
if err != nil {
httpserver.Errorf(w, r, "cannot obtain values for field %q: %s", fieldName, err)
return
}
if limit == 0 || len(values) < limit {
// Sort values only if their number is below the limit.
// Otherwise there is little sense in sorting, since the query may return
// different subset of values on every execution.
slices.Sort(values)
}
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldValuesResponse(w, values)

View file

@ -151,6 +151,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
httpserver.EnableCORS(w, r)
logsql.ProcessFieldValuesRequest(ctx, w, r)
return true
case "/logsql/field_names":
logsqlFieldNamesRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessFieldNamesRequest(ctx, w, r)
return true
default:
return false
}
@ -172,4 +177,5 @@ func getMaxQueryDuration(r *http.Request) time.Duration {
var (
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`)
logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`)
)

View file

@ -111,11 +111,16 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag
return strg.RunQuery(ctx, tenantIDs, q, writeBlock)
}
// GetUniqueFieldValues executes q and returns unique values for the given fieldName.
// GetFieldNames executes q and returns field names seen in results.
func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) {
return strg.GetFieldNames(ctx, tenantIDs, q)
}
// GetFieldValues executes q and returns unique values for the fieldName seen in results.
//
// If limit > 0, then up to limit unique values are returned.
func GetUniqueFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) {
return strg.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, limit)
func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) {
return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit)
}
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {

View file

@ -25,7 +25,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them.
* FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases.
* FEATURE: allow using more convenient syntax for [`range` filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) if upper or lower bound isn't needed. For example, it is possible to write `response_size:>=10KiB` instead of `response_size:range[10KiB, inf)`, or `temperature:<42` instead of `temperature:range(-inf, 42)`.
* FEATURE: add `/select/logsql/field_values` HTTP endpoint for returning unique values for the given field when performing the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details.
* FEATURE: add `/select/logsql/field_names` HTTP endpoint for returning [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details.
* FEATURE: add `/select/logsql/field_values` HTTP endpoint for returning unique values for the given [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details.
* BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`.

View file

@ -88,15 +88,34 @@ curl http://localhost:9428/select/logsql/query -H 'AccountID: 12' -H 'ProjectID:
The number of requests to `/select/logsql/query` can be [monitored](https://docs.victoriametrics.com/VictoriaLogs/#monitoring)
with `vl_http_requests_total{path="/select/logsql/query"}` metric.
### Querying field names
VictoriaLogs provides `/select/logsql/field_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns field names
from result of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [`<start> ... <end>`] time range.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
For example, the following command returns field names across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/field_names -d 'query=error' -d 'start=5m'
```
See also:
- [Querying field values](#querying-field-values)
- [HTTP API](#http-api)
### Querying field values
VictoriaLogs provides `/select/logsql/field_values?query=<query>&field_name=<fieldName>&start=<start>&end=<end>` HTTP endpoint, which returns
unique values for the given `<fieldName>` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
for the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [`<start> ... <end>`] time range.
The `<start>` and `<end>` args can contain values in [supported formats](https://docs.victoriametrics.com/#timestamp-formats).
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [`<start> ... <end>`] time range.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
For example, the following command returns unique the values for `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
across logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes:
across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field_name=host' -d 'start=5m'
@ -105,6 +124,11 @@ curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field
The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values.
See also:
- [Querying field names](#querying-field-names)
- [HTTP API](#http-api)
## Web UI
VictoriaLogs provides a simple Web UI for logs [querying](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) and exploration

View file

@ -136,11 +136,27 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
return errFlush
}
// GetUniqueFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs.
// GetFieldNames returns field names from q results for the given tenantIDs.
func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
// add `field_names ...` to the end of q.pipes
isFirstPipe := len(q.pipes) == 0
pipes := append([]pipe{}, q.pipes...)
pipes = append(pipes, &pipeFieldNames{
isFirstPipe: isFirstPipe,
})
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runSingleColumnQuery(ctx, tenantIDs, q)
}
// GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs.
//
// If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons.
// The caller may sort the returned values if needed.
func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) {
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) {
// add 'uniq fieldName' to the end of q.pipes
if !endsWithPipeUniqSingleField(q.pipes, fieldName) {
pipes := append([]pipe{}, q.pipes...)
@ -154,6 +170,21 @@ func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID
}
}
return s.runSingleColumnQuery(ctx, tenantIDs, q)
}
func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool {
if len(pipes) == 0 {
return false
}
pu, ok := pipes[len(pipes)-1].(*pipeUniq)
if !ok {
return false
}
return len(pu.byFields) == 1 && pu.byFields[0] == fieldName
}
func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
var values []string
var valuesLock sync.Mutex
writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) {
@ -173,31 +204,20 @@ func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID
return values, nil
}
func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool {
if len(pipes) == 0 {
return false
}
pu, ok := pipes[len(pipes)-1].(*pipeUniq)
if !ok {
return false
}
return len(pu.byFields) == 1 && pu.byFields[0] == fieldName
}
func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) {
return q, nil
}
getUniqueValues := func(q *Query, fieldName string) ([]string, error) {
return s.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, 0)
getFieldValues := func(q *Query, fieldName string) ([]string, error) {
return s.GetFieldValues(ctx, tenantIDs, q, fieldName, 0)
}
cache := make(map[string][]string)
fNew, err := initFilterInValuesForFilter(cache, q.f, getUniqueValues)
fNew, err := initFilterInValuesForFilter(cache, q.f, getFieldValues)
if err != nil {
return nil, err
}
pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getUniqueValues)
pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getFieldValues)
if err != nil {
return nil, err
}
@ -231,9 +251,9 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool {
return false
}
type getUniqueValuesFunc func(q *Query, fieldName string) ([]string, error)
type getFieldValuesFunc func(q *Query, fieldName string) ([]string, error)
func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueValuesFunc getUniqueValuesFunc) (filter, error) {
func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldValuesFunc getFieldValuesFunc) (filter, error) {
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
return ok && fi.needExecuteQuery
@ -244,7 +264,7 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueV
qStr := fi.q.String()
values, ok := cache[qStr]
if !ok {
vs, err := getUniqueValuesFunc(fi.q, fi.qFieldName)
vs, err := getFieldValuesFunc(fi.q, fi.qFieldName)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err)
}
@ -262,7 +282,7 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueV
return copyFilter(f, visitFunc, copyFunc)
}
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniqueValuesFunc getUniqueValuesFunc) ([]pipe, error) {
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) {
pipesNew := make([]pipe, len(pipes))
for i, p := range pipes {
switch t := p.(type) {
@ -270,7 +290,7 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniq
funcsNew := make([]pipeStatsFunc, len(t.funcs))
for j, f := range t.funcs {
if f.iff != nil {
fNew, err := initFilterInValuesForFilter(cache, f.iff, getUniqueValuesFunc)
fNew, err := initFilterInValuesForFilter(cache, f.iff, getFieldValuesFunc)
if err != nil {
return nil, err
}