From 58e6cdba8b83171ff59ac2937af3591d4584161f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 19 May 2024 01:32:09 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 81 +++++++++++++++++++++++- app/vlselect/main.go | 6 ++ docs/VictoriaLogs/CHANGELOG.md | 3 +- docs/VictoriaLogs/querying/README.md | 93 +++++++++++++++++++++++++++- lib/logstorage/parser.go | 27 ++++++++ lib/logstorage/pipe_field_names.go | 3 + 6 files changed, 208 insertions(+), 5 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index de438009a..23b53ab1d 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -6,6 +6,7 @@ import ( "math" "net/http" "slices" + "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" @@ -16,7 +17,83 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) +// ProcessHitsRequest handles /select/logsql/hits request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats +func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Obtain step + stepStr := r.FormValue("step") + if stepStr == "" { + stepStr = "1d" + } + step, err := promutils.ParseDuration(stepStr) + if err != nil { + httpserver.Errorf(w, r, "cannot parse 'step' arg: %s", err) + return + } + if step <= 0 { + httpserver.Errorf(w, r, "'step' must be bigger than zero") + } + + // Obtain offset + offsetStr := r.FormValue("offset") + if offsetStr == "" { + offsetStr = "0s" + } + offset, err := promutils.ParseDuration(offsetStr) + if err != nil { + httpserver.Errorf(w, r, "cannot parse 'offset' arg: %s", err) + return + } + + q.AddCountByTimePipe(int64(step), int64(offset)) + q.Optimize() + + var wLock sync.Mutex + isFirstWrite := true + writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { + if len(columns) == 0 || len(columns[0].Values) == 0 { + return + } + + bb := blockResultPool.Get() + for i := range timestamps { + bb.B = append(bb.B, ',') + WriteJSONRow(bb, columns, i) + // Remove newline at the end + bb.B = bb.B[:len(bb.B)-1] + } + wLock.Lock() + buf := bb.B + if isFirstWrite { + buf = buf[1:] + isFirstWrite = false + } + _, _ = w.Write(buf) + wLock.Unlock() + blockResultPool.Put(bb) + } + + // Write response + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, `{"rows":[`) + err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) + fmt.Fprintf(w, `]}`) + + if err != nil { + httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err) + } +} + // ProcessFieldNamesRequest handles /select/logsql/field_names request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { q, tenantIDs, err := parseCommonArgs(r) if err != nil { @@ -40,6 +117,8 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt } // ProcessFieldValuesRequest handles /select/logsql/field_values request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { q, tenantIDs, err := parseCommonArgs(r) if err != nil { @@ -107,7 +186,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req bw := getBufferedWriter(w) writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { - if len(columns) == 0 { + if len(columns) == 0 || len(columns[0].Values) == 0 { return } diff --git a/app/vlselect/main.go b/app/vlselect/main.go index ee2d84534..94b366f9e 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -156,6 +156,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) logsql.ProcessFieldNamesRequest(ctx, w, r) return true + case "/logsql/hits": + logsqlHitsRequests.Inc() + httpserver.EnableCORS(w, r) + logsql.ProcessHitsRequest(ctx, w, r) + return true default: return false } @@ -178,4 +183,5 @@ var ( logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) logsqlFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_names"}`) + logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) ) diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 91b26aa73..98dedab35 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -25,7 +25,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: allow passing string values to [`min`](https://docs.victoriametrics.com/victorialogs/logsql/#min-stats) and [`max`](https://docs.victoriametrics.com/victorialogs/logsql/#max-stats) functions. Previously only numeric values could be passed to them. * FEATURE: speed up [`sort ... limit N` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for typical cases. * FEATURE: allow using more convenient syntax for [`range` filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-filter) if upper or lower bound isn't needed. For example, it is possible to write `response_size:>=10KiB` instead of `response_size:range[10KiB, inf)`, or `temperature:<42` instead of `temperature:range(-inf, 42)`. -* FEATURE: add `/select/logsql/field_names` HTTP endpoint for returning [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details. +* FEATURE: add `/select/logsql/hits` HTTP endpoint for returning the number of matching logs per the given time bucket over the selected time range. See [tese docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats) for details. +* FEATURE: add `/select/logsql/field_names` HTTP endpoint for returning [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-names) for details. * FEATURE: add `/select/logsql/field_values` HTTP endpoint for returning unique values for the given [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) obtained from results of the given query. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-field-values) for details. * BUGFIX: properly take into account `offset` [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) when it already has `limit`. For example, `_time:5m | sort by (foo) offset 20 limit 10`. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index c3e3feaa5..9049db6d3 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -88,11 +88,64 @@ curl http://localhost:9428/select/logsql/query -H 'AccountID: 12' -H 'ProjectID: The number of requests to `/select/logsql/query` can be [monitored](https://docs.victoriametrics.com/VictoriaLogs/#monitoring) with `vl_http_requests_total{path="/select/logsql/query"}` metric. +### Querying hits stats + +VictoriaMetrics provides `/select/logsql/hits?query=&start=&end=&step=` HTTP endpoint, which returns the number +of matching log entries for the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` +time range grouped by `` buckets. The returned results are sorted by time. + +The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. + +The `` arg can contain values in [the format specified here](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets). +If `` is missing, then it equals to `1d` (one day). + +For example, the following command returns per-hour number of [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) over logs for the 3 hour day: + +```sh +curl http://localhost:9428/select/logsql/hits -d 'query=error' -d 'start=1d' -d 'step=1h' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "rows": [ + { + "_time": "2024-01-12T00:00:00Z", + "hits": "800000" + }, + { + "_time": "2024-01-12T01:00:00Z", + "hits": "800000" + }, + { + "_time": "2024-01-12T02:00:00Z", + "hits": "820000" + } +} +``` + +Additionally, the `offset=` arg can be passed to `/select/logsql/hits` in order to group buckets according to the given timezone offset. +The `` can contain values in [the format specified here](https://docs.victoriametrics.com/victorialogs/logsql/#duration-values). + +See also: + +- [Querying field names](#querying-field-names) +- [Querying field values](#querying-field-values) +- [HTTP API](#http-api) + + ### Querying field names VictoriaLogs provides `/select/logsql/field_names?query=&start=&end=` HTTP endpoint, which returns field names -from result of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [` ... `] time range. +from result of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. + The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. For example, the following command returns field names across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: @@ -101,18 +154,36 @@ for the last 5 minutes: curl http://localhost:9428/select/logsql/field_names -d 'query=error' -d 'start=5m' ``` +Below is an example JSON output returned from this endpoint: + +```json +{ + "names": [ + "_msg", + "_stream", + "_time", + "host", + "level", + "location" + ] +} +``` + See also: - [Querying field values](#querying-field-values) +- [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) - ### Querying field values VictoriaLogs provides `/select/logsql/field_values?query=&field_name=&start=&end=` HTTP endpoint, which returns unique values for the given `` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given [` ... `] time range. +from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. + The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. +If `` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs. For example, the following command returns unique the values for `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last 5 minutes: @@ -121,14 +192,30 @@ across logs with the `error` [word](https://docs.victoriametrics.com/victorialog curl http://localhost:9428/select/logsql/field_values -d 'query=error' -d 'field_name=host' -d 'start=5m' ``` +Below is an example JSON output returned from this endpoint: + +```json +{ + "values": [ + "host_0", + "host_1", + "host_10", + "host_100", + "host_1000" + ] +} +``` + The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`. The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values. See also: - [Querying field names](#querying-field-names) +- [Querying hits stats](#querying-hits-stats) - [HTTP API](#http-api) + ## Web UI VictoriaLogs provides a simple Web UI for logs [querying](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) and exploration diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 69b850cb5..dc7420ce6 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -220,6 +220,33 @@ func (q *Query) String() string { return s } +// AddCountByTimePipe adds '| stats by (_time:step offset off) count() hits' to the end of q. +func (q *Query) AddCountByTimePipe(step, off int64) { + { + // add 'stats by (_time:step offset off) count() hits' + stepStr := string(marshalDuration(nil, step)) + offsetStr := string(marshalDuration(nil, off)) + s := fmt.Sprintf("stats by (_time:%s offset %s) count() hits", stepStr, offsetStr) + lex := newLexer(s) + ps, err := parsePipeStats(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing %q: %s", s, err) + } + q.pipes = append(q.pipes, ps) + } + + { + // Add 'sort by (_time)' in order to get consistent order of the results. + s := "sort by (_time)" + lex := newLexer(s) + ps, err := parsePipeSort(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing %q: %s", s, err) + } + q.pipes = append(q.pipes, ps) + } +} + // AddTimeFilter adds global filter _time:[start ... end] to q. func (q *Query) AddTimeFilter(start, end int64) { startStr := marshalTimestampRFC3339NanoString(nil, start) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 86a7410cb..8209c77ca 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -101,6 +101,9 @@ func (pfp *pipeFieldNamesProcessor) flush() error { m[k] = struct{}{} } } + if pfp.pf.isFirstPipe { + m["_time"] = struct{}{} + } // write result wctx := &pipeFieldNamesWriteContext{