From 02f30898e1e28932a5459f4f18da4c917749fc40 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 21 May 2024 22:47:57 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 63 +++++++++- .../logsql/stream_label_names_response.qtpl | 17 +++ .../stream_label_names_response.qtpl.go | 69 +++++++++++ .../logsql/stream_label_values_response.qtpl | 17 +++ .../stream_label_values_response.qtpl.go | 69 +++++++++++ app/vlselect/main.go | 20 +++- app/vlstorage/main.go | 19 +++ docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/querying/README.md | 108 +++++++++++++++-- lib/logstorage/storage_search.go | 113 ++++++++++++++++++ lib/logstorage/storage_search_test.go | 20 ++++ 11 files changed, 503 insertions(+), 14 deletions(-) create mode 100644 app/vlselect/logsql/stream_label_names_response.qtpl create mode 100644 app/vlselect/logsql/stream_label_names_response.qtpl.go create mode 100644 app/vlselect/logsql/stream_label_values_response.qtpl create mode 100644 app/vlselect/logsql/stream_label_values_response.qtpl.go diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index c4bdf51f8..a0d669756 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -189,6 +189,67 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht WriteFieldValuesResponse(w, values) } +// ProcessStreamLabelNamesRequest processes /select/logsql/stream_label_names request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-names +func ProcessStreamLabelNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Obtain stream label names for the given query + q.Optimize() + names, err := vlstorage.GetStreamLabelNames(ctx, tenantIDs, q) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain stream label names: %s", err) + } + + // Write results + w.Header().Set("Content-Type", "application/json") + WriteStreamLabelNamesResponse(w, names) +} + +// ProcessStreamLabelValuesRequest processes /select/logsql/stream_label_values request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-values +func ProcessStreamLabelValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Parse labelName query arg + labelName := r.FormValue("label") + if labelName == "" { + httpserver.Errorf(w, r, "missing 'label' query arg") + return + } + + // Parse limit query arg + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if limit < 0 { + limit = 0 + } + + // Obtain stream label names for the given query + q.Optimize() + values, err := vlstorage.GetStreamLabelValues(ctx, tenantIDs, q, labelName, uint64(limit)) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain stream label values: %s", err) + } + + // Write results + w.Header().Set("Content-Type", "application/json") + WriteStreamLabelValuesResponse(w, values) +} + // ProcessStreamsRequest processes /select/logsql/streams request. // // See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams @@ -211,7 +272,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R // Obtain streams for the given query q.Optimize() - streams, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, "_stream", uint64(limit)) + streams, err := vlstorage.GetStreams(ctx, tenantIDs, q, uint64(limit)) if err != nil { httpserver.Errorf(w, r, "cannot obtain streams: %s", err) } diff --git a/app/vlselect/logsql/stream_label_names_response.qtpl b/app/vlselect/logsql/stream_label_names_response.qtpl new file mode 100644 index 000000000..2e476a79a --- /dev/null +++ b/app/vlselect/logsql/stream_label_names_response.qtpl @@ -0,0 +1,17 @@ +{% stripspace %} + +// StreamLabelNamesResponse formats /select/logsql/stream_label_names response +{% func StreamLabelNamesResponse(names []string) %} +{ + "names":[ + {% if len(names) > 0 %} + {%q= names[0] %} + {% for _, v := range names[1:] %} + ,{%q= v %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/stream_label_names_response.qtpl.go b/app/vlselect/logsql/stream_label_names_response.qtpl.go new file mode 100644 index 000000000..fa7555656 --- /dev/null +++ b/app/vlselect/logsql/stream_label_names_response.qtpl.go @@ -0,0 +1,69 @@ +// Code generated by qtc from "stream_label_names_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// StreamLabelNamesResponse formats /select/logsql/stream_label_names response + +//line app/vlselect/logsql/stream_label_names_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/stream_label_names_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/stream_label_names_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/stream_label_names_response.qtpl:4 +func StreamStreamLabelNamesResponse(qw422016 *qt422016.Writer, names []string) { +//line app/vlselect/logsql/stream_label_names_response.qtpl:4 + qw422016.N().S(`{"names":[`) +//line app/vlselect/logsql/stream_label_names_response.qtpl:7 + if len(names) > 0 { +//line app/vlselect/logsql/stream_label_names_response.qtpl:8 + qw422016.N().Q(names[0]) +//line app/vlselect/logsql/stream_label_names_response.qtpl:9 + for _, v := range names[1:] { +//line app/vlselect/logsql/stream_label_names_response.qtpl:9 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stream_label_names_response.qtpl:10 + qw422016.N().Q(v) +//line app/vlselect/logsql/stream_label_names_response.qtpl:11 + } +//line app/vlselect/logsql/stream_label_names_response.qtpl:12 + } +//line app/vlselect/logsql/stream_label_names_response.qtpl:12 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 +} + +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 +func WriteStreamLabelNamesResponse(qq422016 qtio422016.Writer, names []string) { +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + StreamStreamLabelNamesResponse(qw422016, names) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 +} + +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 +func StreamLabelNamesResponse(names []string) string { +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + WriteStreamLabelNamesResponse(qb422016, names) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 + return qs422016 +//line app/vlselect/logsql/stream_label_names_response.qtpl:15 +} diff --git a/app/vlselect/logsql/stream_label_values_response.qtpl b/app/vlselect/logsql/stream_label_values_response.qtpl new file mode 100644 index 000000000..49c1695df --- /dev/null +++ b/app/vlselect/logsql/stream_label_values_response.qtpl @@ -0,0 +1,17 @@ +{% stripspace %} + +// StreamLabelValuesResponse formats /select/logsql/stream_label_values response +{% func StreamLabelValuesResponse(values []string) %} +{ + "values":[ + {% if len(values) > 0 %} + {%q= values[0] %} + {% for _, v := range values[1:] %} + ,{%q= v %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/stream_label_values_response.qtpl.go b/app/vlselect/logsql/stream_label_values_response.qtpl.go new file mode 100644 index 000000000..7e385c60f --- /dev/null +++ b/app/vlselect/logsql/stream_label_values_response.qtpl.go @@ -0,0 +1,69 @@ +// Code generated by qtc from "stream_label_values_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// StreamLabelValuesResponse formats /select/logsql/stream_label_values response + +//line app/vlselect/logsql/stream_label_values_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/stream_label_values_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/stream_label_values_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/stream_label_values_response.qtpl:4 +func StreamStreamLabelValuesResponse(qw422016 *qt422016.Writer, values []string) { +//line app/vlselect/logsql/stream_label_values_response.qtpl:4 + qw422016.N().S(`{"values":[`) +//line app/vlselect/logsql/stream_label_values_response.qtpl:7 + if len(values) > 0 { +//line app/vlselect/logsql/stream_label_values_response.qtpl:8 + qw422016.N().Q(values[0]) +//line app/vlselect/logsql/stream_label_values_response.qtpl:9 + for _, v := range values[1:] { +//line app/vlselect/logsql/stream_label_values_response.qtpl:9 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stream_label_values_response.qtpl:10 + qw422016.N().Q(v) +//line app/vlselect/logsql/stream_label_values_response.qtpl:11 + } +//line app/vlselect/logsql/stream_label_values_response.qtpl:12 + } +//line app/vlselect/logsql/stream_label_values_response.qtpl:12 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 +} + +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 +func WriteStreamLabelValuesResponse(qq422016 qtio422016.Writer, values []string) { +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + StreamStreamLabelValuesResponse(qw422016, values) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 +} + +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 +func StreamLabelValuesResponse(values []string) string { +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + WriteStreamLabelValuesResponse(qb422016, values) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 + return qs422016 +//line app/vlselect/logsql/stream_label_values_response.qtpl:15 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index a42236f30..a45cf26c3 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -157,6 +157,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { logsqlQueryRequests.Inc() logsql.ProcessQueryRequest(ctx, w, r) return true + case "/select/logsql/stream_label_names": + logsqlStreamLabelNamesRequests.Inc() + logsql.ProcessStreamLabelNamesRequest(ctx, w, r) + return true + case "/select/logsql/stream_label_values": + logsqlStreamLabelValuesRequests.Inc() + logsql.ProcessStreamLabelValuesRequest(ctx, w, r) + return true case "/select/logsql/streams": logsqlStreamsRequests.Inc() logsql.ProcessStreamsRequest(ctx, w, r) @@ -180,9 +188,11 @@ func getMaxQueryDuration(r *http.Request) time.Duration { } var ( - logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) - logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) - logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) - logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) - logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) + logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) + logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) + logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) + logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) + logsqlStreamLabelNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_label_names"}`) + logsqlStreamLabelValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_label_values"}`) + logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) ) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 1d46a3ed3..91a1aa2c3 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -123,6 +123,25 @@ func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *log return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit) } +// GetStreamLabelNames executes q and returns stream labels names seen in results. +func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) { + return strg.GetStreamLabelNames(ctx, tenantIDs, q) +} + +// GetStreamLabelValues executes q and returns stream label values for the given labelName seen in results. +// +// If limit > 0, then up to limit unique stream label values are returned. +func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]string, error) { + return strg.GetStreamLabelValues(ctx, tenantIDs, q, labelName, limit) +} + +// GetStreams executes q and returns streams seen in query results. +// +// If limit > 0, then up to limit unique streams are returned. +func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]string, error) { + return strg.GetStreams(ctx, tenantIDs, q, limit) +} + func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { var ss logstorage.StorageStats strg.UpdateStats(&ss) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index d82db492c..f14ae7ffe 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -24,6 +24,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). * FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. * FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details. +* FEATURE: add `/select/logsql/stream_label_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-names) for details. +* FEATURE: add `/select/logsql/stream_label_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-values) for details. ## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs) diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 1c815aca3..44d52fc64 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -23,7 +23,19 @@ via the following ways: ## HTTP API -VictoriaLogs can be queried at the `/select/logsql/query` HTTP endpoint. +VictoriaLogs provides the following HTTP endpoints: + +- [`/select/logsql/query`](#querying-logs) for querying logs +- [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range +- [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +- [`/select/logsql/stream_label_names`](#querying-stream-label-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names +- [`/select/logsql/stream_label_values`](#querying-stream-label-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values +- [`/select/logsql/field_names`](#querying-field-names) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names. +- [`/select/logsql/field_values`](#querying-field-values) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values. + +### Querying logs + +Logs stored in VictoriaLogs can be queried at the `/select/logsql/query` HTTP endpoint. The [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) query must be passed via `query` argument. For example, the following query returns all the log entries with the `error` word: @@ -88,6 +100,10 @@ curl http://localhost:9428/select/logsql/query -H 'AccountID: 12' -H 'ProjectID: The number of requests to `/select/logsql/query` can be [monitored](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) with `vl_http_requests_total{path="/select/logsql/query"}` metric. +- [Querying hits stats](#querying-hits-stats) +- [Querying streams](#querying-streams) +- [HTTP API](#http-api) + ### Querying hits stats VictoriaMetrics provides `/select/logsql/hits?query=&start=&end=&step=` HTTP endpoint, which returns the number @@ -187,9 +203,8 @@ The grouped fields are put inside `"fields"` object: See also: +- [Querying logs](#querying-logs) - [Querying streams](#querying-streams) -- [Querying field names](#querying-field-names) -- [Querying field values](#querying-field-values) - [HTTP API](#http-api) ### Querying streams @@ -216,7 +231,7 @@ Below is an example JSON output returned from this endpoint: "{host=\"1.2.3.4\",app=\"foo\"}", "{host=\"1.2.3.4\",app=\"bar\"}", "{host=\"10.2.3.4\",app=\"foo\"}", - "{host=\"10.2.3.5\",app=\"baz\"}", + "{host=\"10.2.3.5\",app=\"baz\"}" ] } ``` @@ -226,11 +241,88 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so See also: -- [Querying field names](#querying-field-names) -- [Querying field values](#querying-field-values) +- [Querying logs](#querying-logs) - [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) +### Querying stream label names + +VictoriaLogs provides `/select/logsql/stream_label_names?query=&start=&end=` HTTP endpoint, which returns +[log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results +of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. + +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. + +For example, the following command returns stream label names across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +for the last 5 minutes: + +```sh +curl http://localhost:9428/select/logsql/stream_label_names -d 'query=error' -d 'start=5m' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "names": [ + "app", + "container", + "datacenter", + "host", + "namespace" + ] +} +``` + +See also: + +- [Querying stream label names](#querying-stream-label-names) +- [Querying field values](#querying-field-values) +- [Querying streams](#querying-streams) +- [HTTP API](#http-api) + +### Querying stream label values + +VictoriaLogs provides `/select/logsql/stream_label_values?query=&start=&&label=` HTTP endpoint, +which returns [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the label with the given `` name +from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. + +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. + +For example, the following command returns values for the stream label `host` across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +for the last 5 minutes: + +```sh +curl http://localhost:9428/select/logsql/stream_label_values -d 'query=error' -d 'start=5m' -d 'label=host' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "values": [ + "host-0", + "host-1", + "host-2", + "host-3" + ] +} +``` + +The `/select/logsql/stream_label_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`. +The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values. + +See also: + +- [Querying stream label values](#querying-stream-label-values) +- [Querying field names](#querying-field-names) +- [Querying streams](#querying-streams) +- [HTTP API](#http-api) + ### Querying field names VictoriaLogs provides `/select/logsql/field_names?query=&start=&end=` HTTP endpoint, which returns field names @@ -264,9 +356,9 @@ Below is an example JSON output returned from this endpoint: See also: +- [Querying stream label names](#querying-stream-label-names) - [Querying field values](#querying-field-values) - [Querying streams](#querying-streams) -- [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) ### Querying field values @@ -305,9 +397,9 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so See also: +- [Querying stream label values](#querying-stream-label-values) - [Querying field names](#querying-field-names) - [Querying streams](#querying-streams) -- [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index d5c51c5f2..765444487 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -216,6 +216,63 @@ func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Q return s.runSingleColumnQuery(ctx, tenantIDs, q) } +// GetStreamLabelNames returns stream label names from q results for the given tenantIDs. +func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { + streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) + if err != nil { + return nil, err + } + + var names []string + m := make(map[string]struct{}) + forEachStreamLabel(streams, func(label Field) { + if _, ok := m[label.Name]; !ok { + nameCopy := strings.Clone(label.Name) + names = append(names, nameCopy) + m[nameCopy] = struct{}{} + } + }) + sortStrings(names) + + return names, nil +} + +// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs. +// +// If limit > 9, then up to limit unique label values are returned. +func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]string, error) { + streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) + if err != nil { + return nil, err + } + + var values []string + m := make(map[string]struct{}) + forEachStreamLabel(streams, func(label Field) { + if label.Name != labelName { + return + } + if _, ok := m[label.Value]; !ok { + valueCopy := strings.Clone(label.Value) + values = append(values, valueCopy) + m[valueCopy] = struct{}{} + } + }) + if uint64(len(values)) > limit { + values = values[:limit] + } + sortStrings(values) + + return values, nil +} + +// GetStreams returns streams from q results for the given tenantIDs. +// +// If limit > 0, then up to limit unique streams are returned. +func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]string, error) { + return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit) +} + func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { var values []string var valuesLock sync.Mutex @@ -937,3 +994,59 @@ func getFilterTimeRange(f filter) (int64, int64) { } return math.MinInt64, math.MaxInt64 } + +func forEachStreamLabel(streams []string, f func(label Field)) { + var labels []Field + for _, stream := range streams { + var err error + labels, err = parseStreamLabels(labels[:0], stream) + if err != nil { + continue + } + for i := range labels { + f(labels[i]) + } + } +} + +func parseStreamLabels(dst []Field, s string) ([]Field, error) { + if len(s) == 0 || s[0] != '{' { + return dst, fmt.Errorf("missing '{' at the beginning of stream name") + } + s = s[1:] + if len(s) == 0 || s[len(s)-1] != '}' { + return dst, fmt.Errorf("missing '}' at the end of stream name") + } + s = s[:len(s)-1] + if len(s) == 0 { + return dst, nil + } + + for { + n := strings.Index(s, `="`) + if n < 0 { + return dst, fmt.Errorf("cannot find label value in double quotes at [%s]", s) + } + name := s[:n] + s = s[n+1:] + + value, nOffset := tryUnquoteString(s) + if nOffset < 0 { + return dst, fmt.Errorf("cannot find parse label value in double quotes at [%s]", s) + } + s = s[nOffset:] + + dst = append(dst, Field{ + Name: name, + Value: value, + }) + + if len(s) == 0 { + return dst, nil + } + if s[0] != ',' { + return dst, fmt.Errorf("missing ',' after %s=%q", name, value) + } + s = s[1:] + } +} diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 7e2dd9461..fe1cb5392 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -650,3 +650,23 @@ func TestStorageSearch(t *testing.T) { s.MustClose() fs.MustRemoveAll(path) } + +func TestParseStreamLabelsSuccess(t *testing.T) { + f := func(s, resultExpected string) { + t.Helper() + + labels, err := parseStreamLabels(nil, s) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + result := marshalFieldsToJSON(nil, labels) + if string(result) != resultExpected { + t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected) + } + } + + f(`{}`, `{}`) + f(`{foo="bar"}`, `{"foo":"bar"}`) + f(`{a="b",c="d"}`, `{"a":"b","c":"d"}`) + f(`{a="a=,b\"c}",b="d"}`, `{"a":"a=,b\"c}","b":"d"}`) +}