mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
wip
This commit is contained in:
parent
a080c9e4e5
commit
20918a2810
4 changed files with 77 additions and 6 deletions
|
@ -2,13 +2,17 @@ package logsql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProcessQueryRequest handles /select/logsql/query request.
|
// ProcessQueryRequest handles /select/logsql/query request.
|
||||||
|
@ -19,20 +23,42 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
limit, err := httputils.GetInt(r, "limit")
|
|
||||||
if err != nil {
|
|
||||||
httpserver.Errorf(w, r, "%s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Parse query
|
||||||
qStr := r.FormValue("query")
|
qStr := r.FormValue("query")
|
||||||
q, err := logstorage.ParseQuery(qStr)
|
q, err := logstorage.ParseQuery(qStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
|
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
|
||||||
|
|
||||||
|
// Parse optional start and end args
|
||||||
|
start, okStart, err := getTimeNsec(r, "start")
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
end, okEnd, err := getTimeNsec(r, "end")
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if okStart || okEnd {
|
||||||
|
if !okStart {
|
||||||
|
start = math.MinInt64
|
||||||
|
}
|
||||||
|
if !okEnd {
|
||||||
|
end = math.MaxInt64
|
||||||
|
}
|
||||||
|
q.AddTimeFilter(start, end)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse limit query arg
|
||||||
|
limit, err := httputils.GetInt(r, "limit")
|
||||||
|
if err != nil {
|
||||||
|
httpserver.Errorf(w, r, "%s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
if limit > 0 {
|
if limit > 0 {
|
||||||
q.AddPipeLimit(uint64(limit))
|
q.AddPipeLimit(uint64(limit))
|
||||||
}
|
}
|
||||||
|
@ -55,6 +81,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||||
blockResultPool.Put(bb)
|
blockResultPool.Put(bb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
||||||
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
|
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
|
||||||
|
|
||||||
bw.FlushIgnoreErrors()
|
bw.FlushIgnoreErrors()
|
||||||
|
@ -67,3 +94,16 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
||||||
}
|
}
|
||||||
|
|
||||||
var blockResultPool bytesutil.ByteBufferPool
|
var blockResultPool bytesutil.ByteBufferPool
|
||||||
|
|
||||||
|
func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
|
||||||
|
s := r.FormValue(argName)
|
||||||
|
if s == "" {
|
||||||
|
return 0, false, nil
|
||||||
|
}
|
||||||
|
currentTimestamp := float64(time.Now().UnixNano()) / 1e9
|
||||||
|
secs, err := promutils.ParseTimeAt(s, currentTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
|
||||||
|
}
|
||||||
|
return int64(secs * 1e9), true, nil
|
||||||
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* FEATURE: add support for optional `start` and `end` query args to [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api), which can be used for limiting the time range for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/).
|
||||||
* FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs.
|
* FEATURE: add ability to return the first `N` results from [`sort` pipe](#https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). This is useful when `N` biggest or `N` smallest values must be returned from large amounts of logs.
|
||||||
* FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe).
|
* FEATURE: add [`quantile`](https://docs.victoriametrics.com/victorialogs/logsql/#quantile-stats) and [`median`](https://docs.victoriametrics.com/victorialogs/logsql/#median-stats) [stats functions](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe).
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,13 @@ By default the `/select/logsql/query` returns all the log entries matching the g
|
||||||
```sh
|
```sh
|
||||||
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
|
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
|
||||||
```
|
```
|
||||||
|
- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example:
|
||||||
|
```sh
|
||||||
|
curl http://localhost:9428/select/logsql/query -d 'query=error | limit 10'
|
||||||
|
```
|
||||||
|
- By adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). The time range for the query can be specified via optional
|
||||||
|
`start` and `end` query ars formatted according to [these docs](https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats).
|
||||||
|
- By adding other [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) to the query.
|
||||||
|
|
||||||
The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/),
|
The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/),
|
||||||
where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`.
|
where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`.
|
||||||
|
|
|
@ -206,6 +206,29 @@ func (q *Query) String() string {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddTimeFilter adds global filter _time:[start ... end] to q.
|
||||||
|
func (q *Query) AddTimeFilter(start, end int64) {
|
||||||
|
startStr := marshalTimestampRFC3339Nano(nil, start)
|
||||||
|
endStr := marshalTimestampRFC3339Nano(nil, end)
|
||||||
|
ft := &filterTime{
|
||||||
|
minTimestamp: start,
|
||||||
|
maxTimestamp: end,
|
||||||
|
stringRepr: fmt.Sprintf("[%s, %s]", startStr, endStr),
|
||||||
|
}
|
||||||
|
|
||||||
|
fa, ok := q.f.(*filterAnd)
|
||||||
|
if ok {
|
||||||
|
filters := make([]filter, len(fa.filters)+1)
|
||||||
|
filters[0] = ft
|
||||||
|
copy(filters[1:], fa.filters)
|
||||||
|
fa.filters = filters
|
||||||
|
} else {
|
||||||
|
q.f = &filterAnd{
|
||||||
|
filters: []filter{ft, q.f},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// AddPipeLimit adds `| limit n` pipe to q.
|
// AddPipeLimit adds `| limit n` pipe to q.
|
||||||
//
|
//
|
||||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
|
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
|
||||||
|
|
Loading…
Reference in a new issue