This commit is contained in:
Aliaksandr Valialkin 2024-05-21 21:18:05 +02:00
parent 5d3ecc0537
commit ed46683fee
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
12 changed files with 254 additions and 82 deletions

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
"slices"
"sort" "sort"
"strings" "strings"
"sync" "sync"
@ -145,8 +144,6 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
return return
} }
slices.Sort(fieldNames)
// Write results // Write results
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
WriteFieldNamesResponse(w, fieldNames) WriteFieldNamesResponse(w, fieldNames)
@ -163,9 +160,9 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
} }
// Parse fieldName query arg // Parse fieldName query arg
fieldName := r.FormValue("field_name") fieldName := r.FormValue("field")
if fieldName == "" { if fieldName == "" {
httpserver.Errorf(w, r, "missing 'field_name' query arg") httpserver.Errorf(w, r, "missing 'field' query arg")
return return
} }
@ -187,16 +184,41 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
return return
} }
if limit == 0 || len(values) < limit { // Write results
// Sort values only if their number is below the limit. w.Header().Set("Content-Type", "application/json")
// Otherwise there is little sense in sorting, since the query may return WriteFieldValuesResponse(w, values)
// different subset of values on every execution. }
slices.Sort(values)
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if limit < 0 {
limit = 0
}
// Obtain streams for the given query
q.Optimize()
streams, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, "_stream", uint64(limit))
if err != nil {
httpserver.Errorf(w, r, "cannot obtain streams: %s", err)
} }
// Write results // Write results
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
WriteFieldValuesResponse(w, values) WriteStreamsResponse(w, streams)
} }
// ProcessQueryRequest handles /select/logsql/query request. // ProcessQueryRequest handles /select/logsql/query request.

View file

@ -0,0 +1,17 @@
{% stripspace %}
// StreamsResponse formats /select/logsql/streams response
{% func StreamsResponse(streams []string) %}
{
"streams":[
{% if len(streams) > 0 %}
{%q= streams[0] %}
{% for _, v := range streams[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,69 @@
// Code generated by qtc from "streams_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamsResponse formats /select/logsql/streams response
//line app/vlselect/logsql/streams_response.qtpl:4
package logsql
//line app/vlselect/logsql/streams_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/streams_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/streams_response.qtpl:4
func StreamStreamsResponse(qw422016 *qt422016.Writer, streams []string) {
//line app/vlselect/logsql/streams_response.qtpl:4
qw422016.N().S(`{"streams":[`)
//line app/vlselect/logsql/streams_response.qtpl:7
if len(streams) > 0 {
//line app/vlselect/logsql/streams_response.qtpl:8
qw422016.N().Q(streams[0])
//line app/vlselect/logsql/streams_response.qtpl:9
for _, v := range streams[1:] {
//line app/vlselect/logsql/streams_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/streams_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/streams_response.qtpl:11
}
//line app/vlselect/logsql/streams_response.qtpl:12
}
//line app/vlselect/logsql/streams_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/streams_response.qtpl:15
}
//line app/vlselect/logsql/streams_response.qtpl:15
func WriteStreamsResponse(qq422016 qtio422016.Writer, streams []string) {
//line app/vlselect/logsql/streams_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/streams_response.qtpl:15
StreamStreamsResponse(qw422016, streams)
//line app/vlselect/logsql/streams_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/streams_response.qtpl:15
}
//line app/vlselect/logsql/streams_response.qtpl:15
func StreamsResponse(streams []string) string {
//line app/vlselect/logsql/streams_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/streams_response.qtpl:15
WriteStreamsResponse(qb422016, streams)
//line app/vlselect/logsql/streams_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/streams_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/streams_response.qtpl:15
return qs422016
//line app/vlselect/logsql/streams_response.qtpl:15
}

View file

@ -75,10 +75,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
// Skip requests, which do not start with /select/, since these aren't our requests. // Skip requests, which do not start with /select/, since these aren't our requests.
return false return false
} }
path = strings.TrimPrefix(path, "/select")
path = strings.ReplaceAll(path, "//", "/") path = strings.ReplaceAll(path, "//", "/")
if path == "/vmui" { if path == "/select/vmui" {
// VMUI access via incomplete url without `/` in the end. Redirect to complete url. // VMUI access via incomplete url without `/` in the end. Redirect to complete url.
// Use relative redirect, since the hostname and path prefix may be incorrect if VictoriaMetrics // Use relative redirect, since the hostname and path prefix may be incorrect if VictoriaMetrics
// is hidden behind vmauth or similar proxy. // is hidden behind vmauth or similar proxy.
@ -87,8 +86,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
httpserver.Redirect(w, newURL) httpserver.Redirect(w, newURL)
return true return true
} }
if strings.HasPrefix(path, "/vmui/") { if strings.HasPrefix(path, "/select/vmui/") {
if strings.HasPrefix(path, "/vmui/static/") { if strings.HasPrefix(path, "/select/vmui/static/") {
// Allow clients caching static contents for long period of time, since it shouldn't change over time. // Allow clients caching static contents for long period of time, since it shouldn't change over time.
// Path to static contents (such as js and css) must be changed whenever its contents is changed. // Path to static contents (such as js and css) must be changed whenever its contents is changed.
// See https://developer.chrome.com/docs/lighthouse/performance/uses-long-cache-ttl/ // See https://developer.chrome.com/docs/lighthouse/performance/uses-long-cache-ttl/
@ -140,27 +139,28 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
} }
httpserver.EnableCORS(w, r)
switch path { switch path {
case "/logsql/query": case "/select/logsql/field_names":
logsqlQueryRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessQueryRequest(ctx, w, r)
return true
case "/logsql/field_values":
logsqlFieldValuesRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessFieldValuesRequest(ctx, w, r)
return true
case "/logsql/field_names":
logsqlFieldNamesRequests.Inc() logsqlFieldNamesRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessFieldNamesRequest(ctx, w, r) logsql.ProcessFieldNamesRequest(ctx, w, r)
return true return true
case "/logsql/hits": case "/select/logsql/field_values":
logsqlFieldValuesRequests.Inc()
logsql.ProcessFieldValuesRequest(ctx, w, r)
return true
case "/select/logsql/hits":
logsqlHitsRequests.Inc() logsqlHitsRequests.Inc()
httpserver.EnableCORS(w, r)
logsql.ProcessHitsRequest(ctx, w, r) logsql.ProcessHitsRequest(ctx, w, r)
return true return true
case "/select/logsql/query":
logsqlQueryRequests.Inc()
logsql.ProcessQueryRequest(ctx, w, r)
return true
case "/select/logsql/streams":
logsqlStreamsRequests.Inc()
logsql.ProcessStreamsRequest(ctx, w, r)
return true
default: default:
return false return false
} }
@ -180,8 +180,9 @@ func getMaxQueryDuration(r *http.Request) time.Duration {
} }
var ( var (
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`)
logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`)
logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`)
logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`)
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
) )

View file

@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json). * FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json).
* FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). * FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt).
* FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. * FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field.
* FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details.
## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs) ## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs)

View file

@ -187,10 +187,49 @@ The grouped fields are put inside `"fields"` object:
See also: See also:
- [Querying streams](#querying-streams)
- [Querying field names](#querying-field-names) - [Querying field names](#querying-field-names)
- [Querying field values](#querying-field-values) - [Querying field values](#querying-field-values)
- [HTTP API](#http-api) - [HTTP API](#http-api)
### Querying streams
VictoriaLogs provides `/select/logsql/streams?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
If `<end>` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs.
For example, the following command returns streams across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/streams -d 'query=error' -d 'start=5m'
```
Below is an example JSON output returned from this endpoint:
```json
{
"streams": [
"{host=\"1.2.3.4\",app=\"foo\"}",
"{host=\"1.2.3.4\",app=\"bar\"}",
"{host=\"10.2.3.4\",app=\"foo\"}",
"{host=\"10.2.3.5\",app=\"baz\"}",
]
}
```
The `/select/logsql/streams` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned streams to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of streams.
See also:
- [Querying field names](#querying-field-names)
- [Querying field values](#querying-field-values)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)
### Querying field names ### Querying field names
@ -226,12 +265,13 @@ Below is an example JSON output returned from this endpoint:
See also: See also:
- [Querying field values](#querying-field-values) - [Querying field values](#querying-field-values)
- [Querying streams](#querying-streams)
- [Querying hits stats](#querying-hits-stats) - [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api) - [HTTP API](#http-api)
### Querying field values ### Querying field values
VictoriaLogs provides `/select/logsql/field_values?query=<query>&field_name=<fieldName>&start=<start>&end=<end>` HTTP endpoint, which returns VictoriaLogs provides `/select/logsql/field_values?query=<query>&field=<fieldName>&start=<start>&end=<end>` HTTP endpoint, which returns
unique values for the given `<fieldName>` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) unique values for the given `<fieldName>` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range. from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
@ -243,7 +283,7 @@ For example, the following command returns unique values for `host` [field](http
across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes:
```sh ```sh
curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field_name=host' -d 'start=5m' curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field=host' -d 'start=5m'
``` ```
Below is an example JSON output returned from this endpoint: Below is an example JSON output returned from this endpoint:
@ -266,6 +306,7 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so
See also: See also:
- [Querying field names](#querying-field-names) - [Querying field names](#querying-field-names)
- [Querying streams](#querying-streams)
- [Querying hits stats](#querying-hits-stats) - [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api) - [HTTP API](#http-api)

View file

@ -232,10 +232,15 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
} }
s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr) s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr)
lex := newLexer(s) lex := newLexer(s)
ps, err := parsePipeStats(lex) ps, err := parsePipeStats(lex)
if err != nil { if err != nil {
logger.Panicf("BUG: unexpected error when parsing %q: %s", s, err) logger.Panicf("BUG: unexpected error when parsing [%s]: %s", s, err)
} }
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing [%s]: %q", s, lex.s)
}
q.pipes = append(q.pipes, ps) q.pipes = append(q.pipes, ps)
} }
@ -491,11 +496,14 @@ func parseQuery(lex *lexer) (*Query, error) {
f: f, f: f,
} }
pipes, err := parsePipes(lex) if lex.isKeyword("|") {
if err != nil { lex.nextToken()
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) pipes, err := parsePipes(lex)
if err != nil {
return nil, fmt.Errorf("%w; context: [%s]", err, lex.context())
}
q.pipes = pipes
} }
q.pipes = pipes
return q, nil return q, nil
} }

View file

@ -63,21 +63,20 @@ func (dpp defaultPipeProcessor) flush() error {
func parsePipes(lex *lexer) ([]pipe, error) { func parsePipes(lex *lexer) ([]pipe, error) {
var pipes []pipe var pipes []pipe
for !lex.isKeyword(")", "") { for {
if !lex.isKeyword("|") {
if len(pipes) == 0 {
return nil, fmt.Errorf("expecting '|' after the query filters; got %q", lex.token)
}
return nil, fmt.Errorf("expecting '|' after [%s] pipe; got %q", pipes[len(pipes)-1], lex.token)
}
lex.nextToken()
p, err := parsePipe(lex) p, err := parsePipe(lex)
if err != nil { if err != nil {
return nil, err return nil, err
} }
pipes = append(pipes, p) pipes = append(pipes, p)
switch {
case lex.isKeyword("|"):
lex.nextToken()
case lex.isKeyword(")", ""):
return pipes, nil
}
} }
return pipes, nil
} }
func parsePipe(lex *lexer) (pipe, error) { func parsePipe(lex *lexer) (pipe, error) {

View file

@ -41,7 +41,7 @@ func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) {
type statsFieldsMinProcessor struct { type statsFieldsMinProcessor struct {
sm *statsFieldsMin sm *statsFieldsMin
min string min string
fields []Field fields []Field
} }

View file

@ -33,7 +33,7 @@ func (sm *statsMax) newStatsProcessor() (statsProcessor, int) {
type statsMaxProcessor struct { type statsMaxProcessor struct {
sm *statsMax sm *statsMax
max string max string
} }
func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int {
@ -153,6 +153,7 @@ func (smp *statsMaxProcessor) updateStateBytes(b []byte) {
func (smp *statsMaxProcessor) updateStateString(v string) { func (smp *statsMaxProcessor) updateStateString(v string) {
if v == "" { if v == "" {
// Skip empty strings // Skip empty strings
return
} }
if smp.max != "" && !lessString(smp.max, v) { if smp.max != "" && !lessString(smp.max, v) {
return return

View file

@ -33,7 +33,7 @@ func (sm *statsMin) newStatsProcessor() (statsProcessor, int) {
type statsMinProcessor struct { type statsMinProcessor struct {
sm *statsMin sm *statsMin
min string min string
} }
func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int {

View file

@ -146,17 +146,30 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
// GetFieldNames returns field names from q results for the given tenantIDs. // GetFieldNames returns field names from q results for the given tenantIDs.
func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
// add `field_names ...` to the end of q.pipes
pipes := append([]pipe{}, q.pipes...) pipes := append([]pipe{}, q.pipes...)
pipeStr := "field_names as names | sort by (names)"
pipeStr := "field_names as names"
lex := newLexer(pipeStr) lex := newLexer(pipeStr)
pf, err := parsePipeFieldNames(lex) pf, err := parsePipeFieldNames(lex)
if err != nil { if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'field_names' pipe: %s", err) logger.Panicf("BUG: unexpected error when parsing 'field_names' pipe at [%s]: %s", pipeStr, err)
} }
pf.isFirstPipe = len(pipes) == 0 pf.isFirstPipe = len(pipes) == 0
pipes = append(pipes, pf)
if !lex.isKeyword("|") {
logger.Panicf("BUG: unexpected token after 'field_names' pipe at [%s]: %q", pipeStr, lex.token)
}
lex.nextToken()
ps, err := parsePipeSort(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pf, ps)
q = &Query{ q = &Query{
f: q.f, f: q.f,
@ -168,41 +181,41 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu
// GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs. // GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs.
// //
// If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons. // If limit > 0, then up to limit unique values are returned.
// The caller may sort the returned values if needed.
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) { func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) {
// add 'uniq fieldName' to the end of q.pipes pipes := append([]pipe{}, q.pipes...)
if !endsWithPipeUniqSingleField(q.pipes, fieldName) { quotedFieldName := quoteTokenIfNeeded(fieldName)
pipes := append([]pipe{}, q.pipes...) pipeStr := fmt.Sprintf("uniq by (%s) limit %d | sort by (%s)", quotedFieldName, limit, quotedFieldName)
lex := newLexer(pipeStr)
pipeStr := fmt.Sprintf("uniq by (%s) limit %d", quoteTokenIfNeeded(fieldName), limit) pu, err := parsePipeUniq(lex)
lex := newLexer(pipeStr) if err != nil {
pu, err := parsePipeUniq(lex) logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
if err != nil { }
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe: %s", err)
}
pipes = append(pipes, pu)
q = &Query{ if !lex.isKeyword("|") {
f: q.f, logger.Panicf("BUG: unexpected token after 'uniq' pipe at [%s]: %q", pipeStr, lex.token)
pipes: pipes, }
} lex.nextToken()
ps, err := parsePipeSort(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pu, ps)
q = &Query{
f: q.f,
pipes: pipes,
} }
return s.runSingleColumnQuery(ctx, tenantIDs, q) return s.runSingleColumnQuery(ctx, tenantIDs, q)
} }
func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool {
if len(pipes) == 0 {
return false
}
pu, ok := pipes[len(pipes)-1].(*pipeUniq)
if !ok {
return false
}
return len(pu.byFields) == 1 && pu.byFields[0] == fieldName
}
func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
var values []string var values []string
var valuesLock sync.Mutex var valuesLock sync.Mutex