This commit is contained in:
Aliaksandr Valialkin 2024-05-21 22:47:57 +02:00
parent ed46683fee
commit 02f30898e1
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
11 changed files with 503 additions and 14 deletions

View file

@ -189,6 +189,67 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
WriteFieldValuesResponse(w, values)
}
// ProcessStreamLabelNamesRequest processes /select/logsql/stream_label_names request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-names
func ProcessStreamLabelNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Obtain stream label names for the given query
q.Optimize()
names, err := vlstorage.GetStreamLabelNames(ctx, tenantIDs, q)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain stream label names: %s", err)
}
// Write results
w.Header().Set("Content-Type", "application/json")
WriteStreamLabelNamesResponse(w, names)
}
// ProcessStreamLabelValuesRequest processes /select/logsql/stream_label_values request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-values
func ProcessStreamLabelValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Parse labelName query arg
labelName := r.FormValue("label")
if labelName == "" {
httpserver.Errorf(w, r, "missing 'label' query arg")
return
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if limit < 0 {
limit = 0
}
// Obtain stream label names for the given query
q.Optimize()
values, err := vlstorage.GetStreamLabelValues(ctx, tenantIDs, q, labelName, uint64(limit))
if err != nil {
httpserver.Errorf(w, r, "cannot obtain stream label values: %s", err)
}
// Write results
w.Header().Set("Content-Type", "application/json")
WriteStreamLabelValuesResponse(w, values)
}
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
@ -211,7 +272,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
// Obtain streams for the given query
q.Optimize()
streams, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, "_stream", uint64(limit))
streams, err := vlstorage.GetStreams(ctx, tenantIDs, q, uint64(limit))
if err != nil {
httpserver.Errorf(w, r, "cannot obtain streams: %s", err)
}

View file

@ -0,0 +1,17 @@
{% stripspace %}
// StreamLabelNamesResponse formats /select/logsql/stream_label_names response
{% func StreamLabelNamesResponse(names []string) %}
{
"names":[
{% if len(names) > 0 %}
{%q= names[0] %}
{% for _, v := range names[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,69 @@
// Code generated by qtc from "stream_label_names_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamLabelNamesResponse formats /select/logsql/stream_label_names response
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
package logsql
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
func StreamStreamLabelNamesResponse(qw422016 *qt422016.Writer, names []string) {
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
qw422016.N().S(`{"names":[`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:7
if len(names) > 0 {
//line app/vlselect/logsql/stream_label_names_response.qtpl:8
qw422016.N().Q(names[0])
//line app/vlselect/logsql/stream_label_names_response.qtpl:9
for _, v := range names[1:] {
//line app/vlselect/logsql/stream_label_names_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/stream_label_names_response.qtpl:11
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:12
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
func WriteStreamLabelNamesResponse(qq422016 qtio422016.Writer, names []string) {
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
StreamStreamLabelNamesResponse(qw422016, names)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
func StreamLabelNamesResponse(names []string) string {
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
WriteStreamLabelNamesResponse(qb422016, names)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
return qs422016
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}

View file

@ -0,0 +1,17 @@
{% stripspace %}
// StreamLabelValuesResponse formats /select/logsql/stream_label_values response
{% func StreamLabelValuesResponse(values []string) %}
{
"values":[
{% if len(values) > 0 %}
{%q= values[0] %}
{% for _, v := range values[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,69 @@
// Code generated by qtc from "stream_label_values_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamLabelValuesResponse formats /select/logsql/stream_label_values response
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
package logsql
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
func StreamStreamLabelValuesResponse(qw422016 *qt422016.Writer, values []string) {
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
qw422016.N().S(`{"values":[`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:7
if len(values) > 0 {
//line app/vlselect/logsql/stream_label_values_response.qtpl:8
qw422016.N().Q(values[0])
//line app/vlselect/logsql/stream_label_values_response.qtpl:9
for _, v := range values[1:] {
//line app/vlselect/logsql/stream_label_values_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/stream_label_values_response.qtpl:11
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:12
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
func WriteStreamLabelValuesResponse(qq422016 qtio422016.Writer, values []string) {
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
StreamStreamLabelValuesResponse(qw422016, values)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
func StreamLabelValuesResponse(values []string) string {
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
WriteStreamLabelValuesResponse(qb422016, values)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
return qs422016
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}

View file

@ -157,6 +157,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
logsqlQueryRequests.Inc()
logsql.ProcessQueryRequest(ctx, w, r)
return true
case "/select/logsql/stream_label_names":
logsqlStreamLabelNamesRequests.Inc()
logsql.ProcessStreamLabelNamesRequest(ctx, w, r)
return true
case "/select/logsql/stream_label_values":
logsqlStreamLabelValuesRequests.Inc()
logsql.ProcessStreamLabelValuesRequest(ctx, w, r)
return true
case "/select/logsql/streams":
logsqlStreamsRequests.Inc()
logsql.ProcessStreamsRequest(ctx, w, r)
@ -180,9 +188,11 @@ func getMaxQueryDuration(r *http.Request) time.Duration {
}
var (
logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`)
logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`)
logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`)
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`)
logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`)
logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`)
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlStreamLabelNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_label_names"}`)
logsqlStreamLabelValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_label_values"}`)
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
)

View file

@ -123,6 +123,25 @@ func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *log
return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit)
}
// GetStreamLabelNames executes q and returns stream labels names seen in results.
func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) {
return strg.GetStreamLabelNames(ctx, tenantIDs, q)
}
// GetStreamLabelValues executes q and returns stream label values for the given labelName seen in results.
//
// If limit > 0, then up to limit unique stream label values are returned.
func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]string, error) {
return strg.GetStreamLabelValues(ctx, tenantIDs, q, labelName, limit)
}
// GetStreams executes q and returns streams seen in query results.
//
// If limit > 0, then up to limit unique streams are returned.
func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]string, error) {
return strg.GetStreams(ctx, tenantIDs, q, limit)
}
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)

View file

@ -24,6 +24,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt).
* FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field.
* FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details.
* FEATURE: add `/select/logsql/stream_label_names` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-names) for details.
* FEATURE: add `/select/logsql/stream_label_values` HTTP endpoint for returning [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the given label from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream-label-values) for details.
## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs)

View file

@ -23,7 +23,19 @@ via the following ways:
## HTTP API
VictoriaLogs can be queried at the `/select/logsql/query` HTTP endpoint.
VictoriaLogs provides the following HTTP endpoints:
- [`/select/logsql/query`](#querying-logs) for querying logs
- [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range
- [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
- [`/select/logsql/stream_label_names`](#querying-stream-label-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names
- [`/select/logsql/stream_label_values`](#querying-stream-label-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values
- [`/select/logsql/field_names`](#querying-field-names) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names.
- [`/select/logsql/field_values`](#querying-field-values) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values.
### Querying logs
Logs stored in VictoriaLogs can be queried at the `/select/logsql/query` HTTP endpoint.
The [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) query must be passed via `query` argument.
For example, the following query returns all the log entries with the `error` word:
@ -88,6 +100,10 @@ curl http://localhost:9428/select/logsql/query -H 'AccountID: 12' -H 'ProjectID:
The number of requests to `/select/logsql/query` can be [monitored](https://docs.victoriametrics.com/VictoriaLogs/#monitoring)
with `vl_http_requests_total{path="/select/logsql/query"}` metric.
- [Querying hits stats](#querying-hits-stats)
- [Querying streams](#querying-streams)
- [HTTP API](#http-api)
### Querying hits stats
VictoriaMetrics provides `/select/logsql/hits?query=<query>&start=<start>&end=<end>&step=<step>` HTTP endpoint, which returns the number
@ -187,9 +203,8 @@ The grouped fields are put inside `"fields"` object:
See also:
- [Querying logs](#querying-logs)
- [Querying streams](#querying-streams)
- [Querying field names](#querying-field-names)
- [Querying field values](#querying-field-values)
- [HTTP API](#http-api)
### Querying streams
@ -216,7 +231,7 @@ Below is an example JSON output returned from this endpoint:
"{host=\"1.2.3.4\",app=\"foo\"}",
"{host=\"1.2.3.4\",app=\"bar\"}",
"{host=\"10.2.3.4\",app=\"foo\"}",
"{host=\"10.2.3.5\",app=\"baz\"}",
"{host=\"10.2.3.5\",app=\"baz\"}"
]
}
```
@ -226,11 +241,88 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so
See also:
- [Querying field names](#querying-field-names)
- [Querying field values](#querying-field-values)
- [Querying logs](#querying-logs)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)
### Querying stream label names
VictoriaLogs provides `/select/logsql/stream_label_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns
[log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results
of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
If `<end>` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs.
For example, the following command returns stream label names across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/stream_label_names -d 'query=error' -d 'start=5m'
```
Below is an example JSON output returned from this endpoint:
```json
{
"names": [
"app",
"container",
"datacenter",
"host",
"namespace"
]
}
```
See also:
- [Querying stream label names](#querying-stream-label-names)
- [Querying field values](#querying-field-values)
- [Querying streams](#querying-streams)
- [HTTP API](#http-api)
### Querying stream label values
VictoriaLogs provides `/select/logsql/stream_label_values?query=<query>&start=<start>&<end>&label=<labelName>` HTTP endpoint,
which returns [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the label with the given `<labelName>` name
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
If `<end>` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs.
For example, the following command returns values for the stream label `host` across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/stream_label_values -d 'query=error' -d 'start=5m' -d 'label=host'
```
Below is an example JSON output returned from this endpoint:
```json
{
"values": [
"host-0",
"host-1",
"host-2",
"host-3"
]
}
```
The `/select/logsql/stream_label_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values.
See also:
- [Querying stream label values](#querying-stream-label-values)
- [Querying field names](#querying-field-names)
- [Querying streams](#querying-streams)
- [HTTP API](#http-api)
### Querying field names
VictoriaLogs provides `/select/logsql/field_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns field names
@ -264,9 +356,9 @@ Below is an example JSON output returned from this endpoint:
See also:
- [Querying stream label names](#querying-stream-label-names)
- [Querying field values](#querying-field-values)
- [Querying streams](#querying-streams)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)
### Querying field values
@ -305,9 +397,9 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so
See also:
- [Querying stream label values](#querying-stream-label-values)
- [Querying field names](#querying-field-names)
- [Querying streams](#querying-streams)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)

View file

@ -216,6 +216,63 @@ func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Q
return s.runSingleColumnQuery(ctx, tenantIDs, q)
}
// GetStreamLabelNames returns stream label names from q results for the given tenantIDs.
func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
var names []string
m := make(map[string]struct{})
forEachStreamLabel(streams, func(label Field) {
if _, ok := m[label.Name]; !ok {
nameCopy := strings.Clone(label.Name)
names = append(names, nameCopy)
m[nameCopy] = struct{}{}
}
})
sortStrings(names)
return names, nil
}
// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs.
//
// If limit > 9, then up to limit unique label values are returned.
func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]string, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
var values []string
m := make(map[string]struct{})
forEachStreamLabel(streams, func(label Field) {
if label.Name != labelName {
return
}
if _, ok := m[label.Value]; !ok {
valueCopy := strings.Clone(label.Value)
values = append(values, valueCopy)
m[valueCopy] = struct{}{}
}
})
if uint64(len(values)) > limit {
values = values[:limit]
}
sortStrings(values)
return values, nil
}
// GetStreams returns streams from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]string, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit)
}
func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
var values []string
var valuesLock sync.Mutex
@ -937,3 +994,59 @@ func getFilterTimeRange(f filter) (int64, int64) {
}
return math.MinInt64, math.MaxInt64
}
func forEachStreamLabel(streams []string, f func(label Field)) {
var labels []Field
for _, stream := range streams {
var err error
labels, err = parseStreamLabels(labels[:0], stream)
if err != nil {
continue
}
for i := range labels {
f(labels[i])
}
}
}
func parseStreamLabels(dst []Field, s string) ([]Field, error) {
if len(s) == 0 || s[0] != '{' {
return dst, fmt.Errorf("missing '{' at the beginning of stream name")
}
s = s[1:]
if len(s) == 0 || s[len(s)-1] != '}' {
return dst, fmt.Errorf("missing '}' at the end of stream name")
}
s = s[:len(s)-1]
if len(s) == 0 {
return dst, nil
}
for {
n := strings.Index(s, `="`)
if n < 0 {
return dst, fmt.Errorf("cannot find label value in double quotes at [%s]", s)
}
name := s[:n]
s = s[n+1:]
value, nOffset := tryUnquoteString(s)
if nOffset < 0 {
return dst, fmt.Errorf("cannot find parse label value in double quotes at [%s]", s)
}
s = s[nOffset:]
dst = append(dst, Field{
Name: name,
Value: value,
})
if len(s) == 0 {
return dst, nil
}
if s[0] != ',' {
return dst, fmt.Errorf("missing ',' after %s=%q", name, value)
}
s = s[1:]
}
}

View file

@ -650,3 +650,23 @@ func TestStorageSearch(t *testing.T) {
s.MustClose()
fs.MustRemoveAll(path)
}
func TestParseStreamLabelsSuccess(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
labels, err := parseStreamLabels(nil, s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
result := marshalFieldsToJSON(nil, labels)
if string(result) != resultExpected {
t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
}
}
f(`{}`, `{}`)
f(`{foo="bar"}`, `{"foo":"bar"}`)
f(`{a="b",c="d"}`, `{"a":"b","c":"d"}`)
f(`{a="a=,b\"c}",b="d"}`, `{"a":"a=,b\"c}","b":"d"}`)
}