From ce34e93874ca678f241c4cb59eb5556a920eed25 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 18 May 2024 23:07:51 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/field_names_response.qtpl | 17 +++++ .../logsql/field_names_response.qtpl.go | 69 +++++++++++++++++++ app/vlselect/logsql/logsql.go | 33 ++++++++- app/vlselect/main.go | 6 ++ app/vlstorage/main.go | 11 ++- docs/VictoriaLogs/CHANGELOG.md | 3 +- docs/VictoriaLogs/querying/README.md | 30 +++++++- lib/logstorage/storage_search.go | 64 +++++++++++------ 8 files changed, 203 insertions(+), 30 deletions(-) create mode 100644 app/vlselect/logsql/field_names_response.qtpl create mode 100644 app/vlselect/logsql/field_names_response.qtpl.go diff --git a/app/vlselect/logsql/field_names_response.qtpl b/app/vlselect/logsql/field_names_response.qtpl new file mode 100644 index 000000000..cbb276e0a --- /dev/null +++ b/app/vlselect/logsql/field_names_response.qtpl @@ -0,0 +1,17 @@ +{% stripspace %} + +// FieldNamesResponse formats /select/logsql/field_names response +{% func FieldNamesResponse(names []string) %} +{ + "names":[ + {% if len(names) > 0 %} + {%q= names[0] %} + {% for _, v := range names[1:] %} + ,{%q= v %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/field_names_response.qtpl.go b/app/vlselect/logsql/field_names_response.qtpl.go new file mode 100644 index 000000000..97d1bf011 --- /dev/null +++ b/app/vlselect/logsql/field_names_response.qtpl.go @@ -0,0 +1,69 @@ +// Code generated by qtc from "field_names_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// FieldNamesResponse formats /select/logsql/field_names response + +//line app/vlselect/logsql/field_names_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/field_names_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/field_names_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/field_names_response.qtpl:4 +func StreamFieldNamesResponse(qw422016 *qt422016.Writer, names []string) { +//line app/vlselect/logsql/field_names_response.qtpl:4 + qw422016.N().S(`{"names":[`) +//line app/vlselect/logsql/field_names_response.qtpl:7 + if len(names) > 0 { +//line app/vlselect/logsql/field_names_response.qtpl:8 + qw422016.N().Q(names[0]) +//line app/vlselect/logsql/field_names_response.qtpl:9 + for _, v := range names[1:] { +//line app/vlselect/logsql/field_names_response.qtpl:9 + qw422016.N().S(`,`) +//line app/vlselect/logsql/field_names_response.qtpl:10 + qw422016.N().Q(v) +//line app/vlselect/logsql/field_names_response.qtpl:11 + } +//line app/vlselect/logsql/field_names_response.qtpl:12 + } +//line app/vlselect/logsql/field_names_response.qtpl:12 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/field_names_response.qtpl:15 +} + +//line app/vlselect/logsql/field_names_response.qtpl:15 +func WriteFieldNamesResponse(qq422016 qtio422016.Writer, names []string) { +//line app/vlselect/logsql/field_names_response.qtpl:15 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/field_names_response.qtpl:15 + StreamFieldNamesResponse(qw422016, names) +//line app/vlselect/logsql/field_names_response.qtpl:15 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/field_names_response.qtpl:15 +} + +//line app/vlselect/logsql/field_names_response.qtpl:15 +func FieldNamesResponse(names []string) string { +//line app/vlselect/logsql/field_names_response.qtpl:15 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/field_names_response.qtpl:15 + WriteFieldNamesResponse(qb422016, names) +//line app/vlselect/logsql/field_names_response.qtpl:15 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/field_names_response.qtpl:15 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/field_names_response.qtpl:15 + return qs422016 +//line app/vlselect/logsql/field_names_response.qtpl:15 +} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 351a43f57..b078179a8 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "net/http" + "slices" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" @@ -15,6 +16,29 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) +// ProcessFieldNamesRequest handles /select/logsql/field_names request. +func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Obtain field names for the given query + q.Optimize() + fieldNames, err := vlstorage.GetFieldNames(ctx, tenantIDs, q) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain field names: %w", err) + return + } + + slices.Sort(fieldNames) + + // Write results + w.Header().Set("Content-Type", "application/json") + WriteFieldNamesResponse(w, fieldNames) +} + // ProcessFieldValuesRequest handles /select/logsql/field_values request. func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { q, tenantIDs, err := parseCommonArgs(r) @@ -42,12 +66,19 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht // Obtain unique values for the given field q.Optimize() - values, err := vlstorage.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit)) + values, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, fieldName, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain values for field %q: %s", fieldName, err) return } + if limit == 0 || len(values) < limit { + // Sort values only if their number is below the limit. + // Otherwise there is little sense in sorting, since the query may return + // different subset of values on every execution. + slices.Sort(values) + } + // Write results w.Header().Set("Content-Type", "application/json") WriteFieldValuesResponse(w, values) diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 8e418142e..ee2d84534 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -151,6 +151,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) logsql.ProcessFieldValuesRequest(ctx, w, r) return true + case "/logsql/field_names": + logsqlFieldNamesRequests.Inc() + httpserver.EnableCORS(w, r) + logsql.ProcessFieldNamesRequest(ctx, w, r) + return true default: return false } @@ -172,4 +177,5 @@ func getMaxQueryDuration(r *http.Request) time.Duration { var ( logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) + logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) ) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 6d88a0f71..1d46a3ed3 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -111,11 +111,16 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag return strg.RunQuery(ctx, tenantIDs, q, writeBlock) } -// GetUniqueFieldValues executes q and returns unique values for the given fieldName. +// GetFieldNames executes q and returns field names seen in results. +func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) { + return strg.GetFieldNames(ctx, tenantIDs, q) +} + +// GetFieldValues executes q and returns unique values for the fieldName seen in results. // // If limit > 0, then up to limit unique values are returned. -func GetUniqueFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) { - return strg.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, limit) +func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) { + return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit) } func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 70d765a2a..91b26aa73 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -25,7 +25,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. * FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. * FEATURE: allow using more convenient syntax for [`range` filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) if upper or lower bound isn't needed. For example, it is possible to write `response_size:>=10KiB` instead of `response_size:range[10KiB, inf)`, or `temperature:<42` instead of `temperature:range(-inf, 42)`. -* FEATURE: add `/select/logsql/field_values` HTTP endpoint for returning unique values for the given field when performing the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details. +* FEATURE: add `/select/logsql/field_names` HTTP endpoint for returning [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details. +* FEATURE: add `/select/logsql/field_values` HTTP endpoint for returning unique values for the given [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details. * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index cde9dd8d0..c3e3feaa5 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -88,15 +88,34 @@ curl http://localhost:9428/select/logsql/query -H 'AccountID: 12' -H 'ProjectID: The number of requests to `/select/logsql/query` can be [monitored](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) with `vl_http_requests_total{path="/select/logsql/query"}` metric. +### Querying field names + +VictoriaLogs provides `/select/logsql/field_names?query=&start=&end=` HTTP endpoint, which returns field names +from result of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [` ... `] time range. +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). + +For example, the following command returns field names across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +for the last 5 minutes: + +```sh +curl http://localhost:9428/select/logsql/field_names -d 'query=error' -d 'start=5m' +``` + +See also: + +- [Querying field values](#querying-field-values) +- [HTTP API](#http-api) + + ### Querying field values VictoriaLogs provides `/select/logsql/field_values?query=&field_name=&start=&end=` HTTP endpoint, which returns unique values for the given `` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -for the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [` ... `] time range. -The `` and `` args can contain values in [supported formats](https://docs.victoriametrics.com/#timestamp-formats). +from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [` ... `] time range. +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). For example, the following command returns unique the values for `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -across logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: +across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: ```sh curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field_name=host' -d 'start=5m' @@ -105,6 +124,11 @@ curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`. The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values. +See also: + +- [Querying field names](#querying-field-names) +- [HTTP API](#http-api) + ## Web UI VictoriaLogs provides a simple Web UI for logs [querying](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) and exploration diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index b23051850..70dc7be04 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -136,11 +136,27 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, return errFlush } -// GetUniqueFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs. +// GetFieldNames returns field names from q results for the given tenantIDs. +func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { + // add `field_names ...` to the end of q.pipes + isFirstPipe := len(q.pipes) == 0 + pipes := append([]pipe{}, q.pipes...) + pipes = append(pipes, &pipeFieldNames{ + isFirstPipe: isFirstPipe, + }) + q = &Query{ + f: q.f, + pipes: pipes, + } + + return s.runSingleColumnQuery(ctx, tenantIDs, q) +} + +// GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs. // // If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons. // The caller may sort the returned values if needed. -func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) { +func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) { // add 'uniq fieldName' to the end of q.pipes if !endsWithPipeUniqSingleField(q.pipes, fieldName) { pipes := append([]pipe{}, q.pipes...) @@ -154,6 +170,21 @@ func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID } } + return s.runSingleColumnQuery(ctx, tenantIDs, q) +} + +func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool { + if len(pipes) == 0 { + return false + } + pu, ok := pipes[len(pipes)-1].(*pipeUniq) + if !ok { + return false + } + return len(pu.byFields) == 1 && pu.byFields[0] == fieldName +} + +func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { var values []string var valuesLock sync.Mutex writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) { @@ -173,31 +204,20 @@ func (s *Storage) GetUniqueFieldValues(ctx context.Context, tenantIDs []TenantID return values, nil } -func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool { - if len(pipes) == 0 { - return false - } - pu, ok := pipes[len(pipes)-1].(*pipeUniq) - if !ok { - return false - } - return len(pu.byFields) == 1 && pu.byFields[0] == fieldName -} - func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) { return q, nil } - getUniqueValues := func(q *Query, fieldName string) ([]string, error) { - return s.GetUniqueFieldValues(ctx, tenantIDs, q, fieldName, 0) + getFieldValues := func(q *Query, fieldName string) ([]string, error) { + return s.GetFieldValues(ctx, tenantIDs, q, fieldName, 0) } cache := make(map[string][]string) - fNew, err := initFilterInValuesForFilter(cache, q.f, getUniqueValues) + fNew, err := initFilterInValuesForFilter(cache, q.f, getFieldValues) if err != nil { return nil, err } - pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getUniqueValues) + pipesNew, err := initFilterInValuesForPipes(cache, q.pipes, getFieldValues) if err != nil { return nil, err } @@ -231,9 +251,9 @@ func hasFilterInWithQueryForPipes(pipes []pipe) bool { return false } -type getUniqueValuesFunc func(q *Query, fieldName string) ([]string, error) +type getFieldValuesFunc func(q *Query, fieldName string) ([]string, error) -func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueValuesFunc getUniqueValuesFunc) (filter, error) { +func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldValuesFunc getFieldValuesFunc) (filter, error) { visitFunc := func(f filter) bool { fi, ok := f.(*filterIn) return ok && fi.needExecuteQuery @@ -244,7 +264,7 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueV qStr := fi.q.String() values, ok := cache[qStr] if !ok { - vs, err := getUniqueValuesFunc(fi.q, fi.qFieldName) + vs, err := getFieldValuesFunc(fi.q, fi.qFieldName) if err != nil { return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err) } @@ -262,7 +282,7 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getUniqueV return copyFilter(f, visitFunc, copyFunc) } -func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniqueValuesFunc getUniqueValuesFunc) ([]pipe, error) { +func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) { pipesNew := make([]pipe, len(pipes)) for i, p := range pipes { switch t := p.(type) { @@ -270,7 +290,7 @@ func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getUniq funcsNew := make([]pipeStatsFunc, len(t.funcs)) for j, f := range t.funcs { if f.iff != nil { - fNew, err := initFilterInValuesForFilter(cache, f.iff, getUniqueValuesFunc) + fNew, err := initFilterInValuesForFilter(cache, f.iff, getFieldValuesFunc) if err != nil { return nil, err }