diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index bb82402cd..c4bdf51f8 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "net/http" - "slices" "sort" "strings" "sync" @@ -145,8 +144,6 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt return } - slices.Sort(fieldNames) - // Write results w.Header().Set("Content-Type", "application/json") WriteFieldNamesResponse(w, fieldNames) @@ -163,9 +160,9 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht } // Parse fieldName query arg - fieldName := r.FormValue("field_name") + fieldName := r.FormValue("field") if fieldName == "" { - httpserver.Errorf(w, r, "missing 'field_name' query arg") + httpserver.Errorf(w, r, "missing 'field' query arg") return } @@ -187,16 +184,41 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht return } - if limit == 0 || len(values) < limit { - // Sort values only if their number is below the limit. - // Otherwise there is little sense in sorting, since the query may return - // different subset of values on every execution. - slices.Sort(values) + // Write results + w.Header().Set("Content-Type", "application/json") + WriteFieldValuesResponse(w, values) +} + +// ProcessStreamsRequest processes /select/logsql/streams request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams +func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Parse limit query arg + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if limit < 0 { + limit = 0 + } + + // Obtain streams for the given query + q.Optimize() + streams, err := vlstorage.GetFieldValues(ctx, tenantIDs, q, "_stream", uint64(limit)) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain streams: %s", err) } // Write results w.Header().Set("Content-Type", "application/json") - WriteFieldValuesResponse(w, values) + WriteStreamsResponse(w, streams) } // ProcessQueryRequest handles /select/logsql/query request. diff --git a/app/vlselect/logsql/streams_response.qtpl b/app/vlselect/logsql/streams_response.qtpl new file mode 100644 index 000000000..3242aa798 --- /dev/null +++ b/app/vlselect/logsql/streams_response.qtpl @@ -0,0 +1,17 @@ +{% stripspace %} + +// StreamsResponse formats /select/logsql/streams response +{% func StreamsResponse(streams []string) %} +{ + "streams":[ + {% if len(streams) > 0 %} + {%q= streams[0] %} + {% for _, v := range streams[1:] %} + ,{%q= v %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/streams_response.qtpl.go b/app/vlselect/logsql/streams_response.qtpl.go new file mode 100644 index 000000000..e1a1b8feb --- /dev/null +++ b/app/vlselect/logsql/streams_response.qtpl.go @@ -0,0 +1,69 @@ +// Code generated by qtc from "streams_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// StreamsResponse formats /select/logsql/streams response + +//line app/vlselect/logsql/streams_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/streams_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/streams_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/streams_response.qtpl:4 +func StreamStreamsResponse(qw422016 *qt422016.Writer, streams []string) { +//line app/vlselect/logsql/streams_response.qtpl:4 + qw422016.N().S(`{"streams":[`) +//line app/vlselect/logsql/streams_response.qtpl:7 + if len(streams) > 0 { +//line app/vlselect/logsql/streams_response.qtpl:8 + qw422016.N().Q(streams[0]) +//line app/vlselect/logsql/streams_response.qtpl:9 + for _, v := range streams[1:] { +//line app/vlselect/logsql/streams_response.qtpl:9 + qw422016.N().S(`,`) +//line app/vlselect/logsql/streams_response.qtpl:10 + qw422016.N().Q(v) +//line app/vlselect/logsql/streams_response.qtpl:11 + } +//line app/vlselect/logsql/streams_response.qtpl:12 + } +//line app/vlselect/logsql/streams_response.qtpl:12 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/streams_response.qtpl:15 +} + +//line app/vlselect/logsql/streams_response.qtpl:15 +func WriteStreamsResponse(qq422016 qtio422016.Writer, streams []string) { +//line app/vlselect/logsql/streams_response.qtpl:15 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/streams_response.qtpl:15 + StreamStreamsResponse(qw422016, streams) +//line app/vlselect/logsql/streams_response.qtpl:15 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/streams_response.qtpl:15 +} + +//line app/vlselect/logsql/streams_response.qtpl:15 +func StreamsResponse(streams []string) string { +//line app/vlselect/logsql/streams_response.qtpl:15 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/streams_response.qtpl:15 + WriteStreamsResponse(qb422016, streams) +//line app/vlselect/logsql/streams_response.qtpl:15 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/streams_response.qtpl:15 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/streams_response.qtpl:15 + return qs422016 +//line app/vlselect/logsql/streams_response.qtpl:15 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 94b366f9e..a42236f30 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -75,10 +75,9 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { // Skip requests, which do not start with /select/, since these aren't our requests. return false } - path = strings.TrimPrefix(path, "/select") path = strings.ReplaceAll(path, "//", "/") - if path == "/vmui" { + if path == "/select/vmui" { // VMUI access via incomplete url without `/` in the end. Redirect to complete url. // Use relative redirect, since the hostname and path prefix may be incorrect if VictoriaMetrics // is hidden behind vmauth or similar proxy. @@ -87,8 +86,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.Redirect(w, newURL) return true } - if strings.HasPrefix(path, "/vmui/") { - if strings.HasPrefix(path, "/vmui/static/") { + if strings.HasPrefix(path, "/select/vmui/") { + if strings.HasPrefix(path, "/select/vmui/static/") { // Allow clients caching static contents for long period of time, since it shouldn't change over time. // Path to static contents (such as js and css) must be changed whenever its contents is changed. // See https://developer.chrome.com/docs/lighthouse/performance/uses-long-cache-ttl/ @@ -140,27 +139,28 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } } + httpserver.EnableCORS(w, r) switch path { - case "/logsql/query": - logsqlQueryRequests.Inc() - httpserver.EnableCORS(w, r) - logsql.ProcessQueryRequest(ctx, w, r) - return true - case "/logsql/field_values": - logsqlFieldValuesRequests.Inc() - httpserver.EnableCORS(w, r) - logsql.ProcessFieldValuesRequest(ctx, w, r) - return true - case "/logsql/field_names": + case "/select/logsql/field_names": logsqlFieldNamesRequests.Inc() - httpserver.EnableCORS(w, r) logsql.ProcessFieldNamesRequest(ctx, w, r) return true - case "/logsql/hits": + case "/select/logsql/field_values": + logsqlFieldValuesRequests.Inc() + logsql.ProcessFieldValuesRequest(ctx, w, r) + return true + case "/select/logsql/hits": logsqlHitsRequests.Inc() - httpserver.EnableCORS(w, r) logsql.ProcessHitsRequest(ctx, w, r) return true + case "/select/logsql/query": + logsqlQueryRequests.Inc() + logsql.ProcessQueryRequest(ctx, w, r) + return true + case "/select/logsql/streams": + logsqlStreamsRequests.Inc() + logsql.ProcessStreamsRequest(ctx, w, r) + return true default: return false } @@ -180,8 +180,9 @@ func getMaxQueryDuration(r *http.Request) time.Duration { } var ( - logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) - logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) + logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) + logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) + logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) ) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 1e65f17c6..d82db492c 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -23,6 +23,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: add ability to unpack JSON fields with [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_json). * FEATURE: add ability to unpack [logfmt](https://brandur.org/logfmt) fields with [`unpack_logfmt` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe) only if the given condition is met. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#conditional-unpack_logfmt). * FEATURE: add [`fields_min`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_min-stats) and [`fields_max`](https://docs.victoriametrics.com/victorialogs/logsql/#fields_max-stats) functions for [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe), which allow returning all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for the log entry with the minimum / maximum value at the given field. +* FEATURE: add `/select/logsql/streams` HTTP endpoint for returning [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-streams) for details. ## [v0.8.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.8.0-victorialogs) diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 42f45b487..1c815aca3 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -187,10 +187,49 @@ The grouped fields are put inside `"fields"` object: See also: +- [Querying streams](#querying-streams) - [Querying field names](#querying-field-names) - [Querying field values](#querying-field-values) - [HTTP API](#http-api) +### Querying streams + +VictoriaLogs provides `/select/logsql/streams?query=&start=&end=` HTTP endpoint, which returns [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) +from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. + +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. + +For example, the following command returns streams across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +for the last 5 minutes: + +```sh +curl http://localhost:9428/select/logsql/streams -d 'query=error' -d 'start=5m' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "streams": [ + "{host=\"1.2.3.4\",app=\"foo\"}", + "{host=\"1.2.3.4\",app=\"bar\"}", + "{host=\"10.2.3.4\",app=\"foo\"}", + "{host=\"10.2.3.5\",app=\"baz\"}", + ] +} +``` + +The `/select/logsql/streams` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned streams to `N`. +The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of streams. + +See also: + +- [Querying field names](#querying-field-names) +- [Querying field values](#querying-field-values) +- [Querying hits stats](#querying-hits-stats) +- [HTTP API](#http-api) ### Querying field names @@ -226,12 +265,13 @@ Below is an example JSON output returned from this endpoint: See also: - [Querying field values](#querying-field-values) +- [Querying streams](#querying-streams) - [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) ### Querying field values -VictoriaLogs provides `/select/logsql/field_values?query=&field_name=&start=&end=` HTTP endpoint, which returns +VictoriaLogs provides `/select/logsql/field_values?query=&field=&start=&end=` HTTP endpoint, which returns unique values for the given `` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. @@ -243,7 +283,7 @@ For example, the following command returns unique values for `host` [field](http across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: ```sh -curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field_name=host' -d 'start=5m' +curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field=host' -d 'start=5m' ``` Below is an example JSON output returned from this endpoint: @@ -266,6 +306,7 @@ The endpoint returns arbitrary subset of values if their number exceeds `N`, so See also: - [Querying field names](#querying-field-names) +- [Querying streams](#querying-streams) - [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index c64adba9e..8a71ba1de 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -232,10 +232,15 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { } s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr) lex := newLexer(s) + ps, err := parsePipeStats(lex) if err != nil { - logger.Panicf("BUG: unexpected error when parsing %q: %s", s, err) + logger.Panicf("BUG: unexpected error when parsing [%s]: %s", s, err) } + if !lex.isEnd() { + logger.Panicf("BUG: unexpected tail left after parsing [%s]: %q", s, lex.s) + } + q.pipes = append(q.pipes, ps) } @@ -491,11 +496,14 @@ func parseQuery(lex *lexer) (*Query, error) { f: f, } - pipes, err := parsePipes(lex) - if err != nil { - return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) + if lex.isKeyword("|") { + lex.nextToken() + pipes, err := parsePipes(lex) + if err != nil { + return nil, fmt.Errorf("%w; context: [%s]", err, lex.context()) + } + q.pipes = pipes } - q.pipes = pipes return q, nil } diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index aee6f897f..3e9eb9a58 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -63,21 +63,20 @@ func (dpp defaultPipeProcessor) flush() error { func parsePipes(lex *lexer) ([]pipe, error) { var pipes []pipe - for !lex.isKeyword(")", "") { - if !lex.isKeyword("|") { - if len(pipes) == 0 { - return nil, fmt.Errorf("expecting '|' after the query filters; got %q", lex.token) - } - return nil, fmt.Errorf("expecting '|' after [%s] pipe; got %q", pipes[len(pipes)-1], lex.token) - } - lex.nextToken() + for { p, err := parsePipe(lex) if err != nil { return nil, err } pipes = append(pipes, p) + + switch { + case lex.isKeyword("|"): + lex.nextToken() + case lex.isKeyword(")", ""): + return pipes, nil + } } - return pipes, nil } func parsePipe(lex *lexer) (pipe, error) { diff --git a/lib/logstorage/stats_fields_min.go b/lib/logstorage/stats_fields_min.go index 287b134b8..79ce9cada 100644 --- a/lib/logstorage/stats_fields_min.go +++ b/lib/logstorage/stats_fields_min.go @@ -41,7 +41,7 @@ func (sm *statsFieldsMin) newStatsProcessor() (statsProcessor, int) { type statsFieldsMinProcessor struct { sm *statsFieldsMin - min string + min string fields []Field } diff --git a/lib/logstorage/stats_max.go b/lib/logstorage/stats_max.go index f56fa6763..290641c05 100644 --- a/lib/logstorage/stats_max.go +++ b/lib/logstorage/stats_max.go @@ -33,7 +33,7 @@ func (sm *statsMax) newStatsProcessor() (statsProcessor, int) { type statsMaxProcessor struct { sm *statsMax - max string + max string } func (smp *statsMaxProcessor) updateStatsForAllRows(br *blockResult) int { @@ -153,6 +153,7 @@ func (smp *statsMaxProcessor) updateStateBytes(b []byte) { func (smp *statsMaxProcessor) updateStateString(v string) { if v == "" { // Skip empty strings + return } if smp.max != "" && !lessString(smp.max, v) { return diff --git a/lib/logstorage/stats_min.go b/lib/logstorage/stats_min.go index 56d4ae711..4711a43ce 100644 --- a/lib/logstorage/stats_min.go +++ b/lib/logstorage/stats_min.go @@ -33,7 +33,7 @@ func (sm *statsMin) newStatsProcessor() (statsProcessor, int) { type statsMinProcessor struct { sm *statsMin - min string + min string } func (smp *statsMinProcessor) updateStatsForAllRows(br *blockResult) int { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 5aba815fd..d5c51c5f2 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -146,17 +146,30 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, // GetFieldNames returns field names from q results for the given tenantIDs. func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { - // add `field_names ...` to the end of q.pipes pipes := append([]pipe{}, q.pipes...) - - pipeStr := "field_names as names" + pipeStr := "field_names as names | sort by (names)" lex := newLexer(pipeStr) + pf, err := parsePipeFieldNames(lex) if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'field_names' pipe: %s", err) + logger.Panicf("BUG: unexpected error when parsing 'field_names' pipe at [%s]: %s", pipeStr, err) } pf.isFirstPipe = len(pipes) == 0 - pipes = append(pipes, pf) + + if !lex.isKeyword("|") { + logger.Panicf("BUG: unexpected token after 'field_names' pipe at [%s]: %q", pipeStr, lex.token) + } + lex.nextToken() + + ps, err := parsePipeSort(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err) + } + if !lex.isEnd() { + logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s) + } + + pipes = append(pipes, pf, ps) q = &Query{ f: q.f, @@ -168,41 +181,41 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu // GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs. // -// If limit > 0, then up to limit unique values are returned. The values are returned in arbitrary order because of performance reasons. -// The caller may sort the returned values if needed. +// If limit > 0, then up to limit unique values are returned. func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) { - // add 'uniq fieldName' to the end of q.pipes - if !endsWithPipeUniqSingleField(q.pipes, fieldName) { - pipes := append([]pipe{}, q.pipes...) + pipes := append([]pipe{}, q.pipes...) + quotedFieldName := quoteTokenIfNeeded(fieldName) + pipeStr := fmt.Sprintf("uniq by (%s) limit %d | sort by (%s)", quotedFieldName, limit, quotedFieldName) + lex := newLexer(pipeStr) - pipeStr := fmt.Sprintf("uniq by (%s) limit %d", quoteTokenIfNeeded(fieldName), limit) - lex := newLexer(pipeStr) - pu, err := parsePipeUniq(lex) - if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe: %s", err) - } - pipes = append(pipes, pu) + pu, err := parsePipeUniq(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err) + } - q = &Query{ - f: q.f, - pipes: pipes, - } + if !lex.isKeyword("|") { + logger.Panicf("BUG: unexpected token after 'uniq' pipe at [%s]: %q", pipeStr, lex.token) + } + lex.nextToken() + + ps, err := parsePipeSort(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err) + } + if !lex.isEnd() { + logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s) + } + + pipes = append(pipes, pu, ps) + + q = &Query{ + f: q.f, + pipes: pipes, } return s.runSingleColumnQuery(ctx, tenantIDs, q) } -func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool { - if len(pipes) == 0 { - return false - } - pu, ok := pipes[len(pipes)-1].(*pipeUniq) - if !ok { - return false - } - return len(pu.byFields) == 1 && pu.byFields[0] == fieldName -} - func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { var values []string var valuesLock sync.Mutex