From 20918a28103884a8b3497695ca104f3dcf2e79e3 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 May 2024 04:42:03 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 52 ++++++++++++++++++++++++---- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/querying/README.md | 7 ++++ lib/logstorage/parser.go | 23 ++++++++++++ 4 files changed, 77 insertions(+), 6 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index d7385b642..c09887872 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -2,13 +2,17 @@ package logsql import ( "context" + "fmt" + "math" "net/http" + "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils" ) // ProcessQueryRequest handles /select/logsql/query request. @@ -19,20 +23,42 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req httpserver.Errorf(w, r, "%s", err) return } - limit, err := httputils.GetInt(r, "limit") - if err != nil { - httpserver.Errorf(w, r, "%s", err) - return - } + // Parse query qStr := r.FormValue("query") q, err := logstorage.ParseQuery(qStr) if err != nil { httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err) return } - w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") + // Parse optional start and end args + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if okStart || okEnd { + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } + q.AddTimeFilter(start, end) + } + + // Parse limit query arg + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } if limit > 0 { q.AddPipeLimit(uint64(limit)) } @@ -55,6 +81,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req blockResultPool.Put(bb) } + w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) bw.FlushIgnoreErrors() @@ -67,3 +94,16 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } var blockResultPool bytesutil.ByteBufferPool + +func getTimeNsec(r *http.Request, argName string) (int64, bool, error) { + s := r.FormValue(argName) + if s == "" { + return 0, false, nil + } + currentTimestamp := float64(time.Now().UnixNano()) / 1e9 + secs, err := promutils.ParseTimeAt(s, currentTimestamp) + if err != nil { + return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err) + } + return int64(secs * 1e9), true, nil +} diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index f4575985a..aef94f305 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FEATURE: add support for optional `start` and `end` query args to [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api), which can be used for limiting the time range for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/). * FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs. * FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 2755dac0a..bfad23e75 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -50,6 +50,13 @@ By default the `/select/logsql/query` returns all the log entries matching the g ```sh curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' ``` +- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example: + ```sh + curl http://localhost:9428/select/logsql/query -d 'query=error | limit 10' + ``` +- By adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). The time range for the query can be specified via optional + `start` and `end` query ars formatted according to [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats). +- By adding other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) to the query. The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/), where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`. diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index b5e00033e..a1f1422a8 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,6 +206,29 @@ func (q *Query) String() string { return s } +// AddTimeFilter adds global filter _time:[start ... end] to q. +func (q *Query) AddTimeFilter(start, end int64) { + startStr := marshalTimestampRFC3339Nano(nil, start) + endStr := marshalTimestampRFC3339Nano(nil, end) + ft := &filterTime{ + minTimestamp: start, + maxTimestamp: end, + stringRepr: fmt.Sprintf("[%s, %s]", startStr, endStr), + } + + fa, ok := q.f.(*filterAnd) + if ok { + filters := make([]filter, len(fa.filters)+1) + filters[0] = ft + copy(filters[1:], fa.filters) + fa.filters = filters + } else { + q.f = &filterAnd{ + filters: []filter{ft, q.f}, + } + } +} + // AddPipeLimit adds `| limit n` pipe to q. // // See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe