mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vlselect: add /select/logsql/stats_query_range endpoint for building time series panels in VictoriaLogs plugin for Grafana
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6943 Updates https://github.com/VictoriaMetrics/victorialogs-datasource/issues/61
This commit is contained in:
parent
01c8e12370
commit
c448189f69
8 changed files with 550 additions and 10 deletions
|
@ -45,6 +45,7 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
|
|||
}
|
||||
if step <= 0 {
|
||||
httpserver.Errorf(w, r, "'step' must be bigger than zero")
|
||||
return
|
||||
}
|
||||
|
||||
// Obtain offset
|
||||
|
@ -563,6 +564,137 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) {
|
|||
return tailRows, nil
|
||||
}
|
||||
|
||||
// ProcessStatsQueryRangeRequest handles /select/logsql/stats_query_range request.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats
|
||||
func ProcessStatsQueryRangeRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
q, tenantIDs, err := parseCommonArgs(r)
|
||||
if err != nil {
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Obtain step
|
||||
stepStr := r.FormValue("step")
|
||||
if stepStr == "" {
|
||||
stepStr = "1d"
|
||||
}
|
||||
step, err := promutils.ParseDuration(stepStr)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("cannot parse 'step' arg: %s", err)
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return
|
||||
}
|
||||
if step <= 0 {
|
||||
err := fmt.Errorf("'step' must be bigger than zero")
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Obtain `by(...)` fields from the last `| stats` pipe in q.
|
||||
// Add `_time:step` to the `by(...)` list.
|
||||
byFields, ok := q.GetStatsByFields(int64(step))
|
||||
if !ok {
|
||||
err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q)
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
q.Optimize()
|
||||
|
||||
m := make(map[string]*statsSeries)
|
||||
var mLock sync.Mutex
|
||||
|
||||
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
||||
clonedColumnNames := make([]string, len(columns))
|
||||
for i, c := range columns {
|
||||
clonedColumnNames[i] = strings.Clone(c.Name)
|
||||
}
|
||||
for i := range timestamps {
|
||||
timestamp := q.GetTimestamp()
|
||||
labels := make([]logstorage.Field, 0, len(byFields))
|
||||
for j, c := range columns {
|
||||
if c.Name == "_time" {
|
||||
nsec, ok := logstorage.TryParseTimestampRFC3339Nano(c.Values[i])
|
||||
if ok {
|
||||
timestamp = nsec
|
||||
continue
|
||||
}
|
||||
}
|
||||
if slices.Contains(byFields, c.Name) {
|
||||
labels = append(labels, logstorage.Field{
|
||||
Name: clonedColumnNames[j],
|
||||
Value: strings.Clone(c.Values[i]),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var dst []byte
|
||||
for j, c := range columns {
|
||||
if !slices.Contains(byFields, c.Name) {
|
||||
name := clonedColumnNames[j]
|
||||
dst = dst[:0]
|
||||
dst = append(dst, name...)
|
||||
dst = logstorage.MarshalFieldsToJSON(dst, labels)
|
||||
key := string(dst)
|
||||
p := statsPoint{
|
||||
Timestamp: timestamp,
|
||||
Value: strings.Clone(c.Values[i]),
|
||||
}
|
||||
|
||||
mLock.Lock()
|
||||
ss := m[key]
|
||||
if ss == nil {
|
||||
ss = &statsSeries{
|
||||
key: key,
|
||||
Name: name,
|
||||
Labels: labels,
|
||||
}
|
||||
m[key] = ss
|
||||
}
|
||||
ss.Points = append(ss.Points, p)
|
||||
mLock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
|
||||
err = fmt.Errorf("cannot execute query [%s]: %s", q, err)
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return
|
||||
}
|
||||
|
||||
// Sort the collected stats by time
|
||||
rows := make([]*statsSeries, 0, len(m))
|
||||
for _, ss := range m {
|
||||
points := ss.Points
|
||||
sort.Slice(points, func(i, j int) bool {
|
||||
return points[i].Timestamp < points[j].Timestamp
|
||||
})
|
||||
rows = append(rows, ss)
|
||||
}
|
||||
sort.Slice(rows, func(i, j int) bool {
|
||||
return rows[i].key < rows[j].key
|
||||
})
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
WriteStatsQueryRangeResponse(w, rows)
|
||||
}
|
||||
|
||||
type statsSeries struct {
|
||||
key string
|
||||
|
||||
Name string
|
||||
Labels []logstorage.Field
|
||||
Points []statsPoint
|
||||
}
|
||||
|
||||
type statsPoint struct {
|
||||
Timestamp int64
|
||||
Value string
|
||||
}
|
||||
|
||||
// ProcessStatsQueryRequest handles /select/logsql/stats_query request.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats
|
||||
|
@ -573,8 +705,8 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
|
|||
return
|
||||
}
|
||||
|
||||
// Verify that q ends with `| stats` pipe
|
||||
byFields, ok := q.GetStatsByFields()
|
||||
// Obtain `by(...)` fields from the last `| stats` pipe in q.
|
||||
byFields, ok := q.GetStatsByFields(0)
|
||||
if !ok {
|
||||
err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q)
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
|
@ -611,6 +743,7 @@ func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *htt
|
|||
Timestamp: timestamp,
|
||||
Value: strings.Clone(c.Values[i]),
|
||||
}
|
||||
|
||||
rowsLock.Lock()
|
||||
rows = append(rows, r)
|
||||
rowsLock.Unlock()
|
||||
|
|
52
app/vlselect/logsql/stats_query_range_response.qtpl
Normal file
52
app/vlselect/logsql/stats_query_range_response.qtpl
Normal file
|
@ -0,0 +1,52 @@
|
|||
{% stripspace %}
|
||||
|
||||
// StatsQueryRangeResponse generates response for /select/logsql/stats_query_range
|
||||
{% func StatsQueryRangeResponse(rows []*statsSeries) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":{
|
||||
"resultType":"matrix",
|
||||
"result":[
|
||||
{% if len(rows) > 0 %}
|
||||
{%= formatStatsSeries(rows[0]) %}
|
||||
{% code rows = rows[1:] %}
|
||||
{% for i := range rows %}
|
||||
,{%= formatStatsSeries(rows[i]) %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
}
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% func formatStatsSeries(ss *statsSeries) %}
|
||||
{
|
||||
"metric":{
|
||||
"__name__":{%q= ss.Name %}
|
||||
{% if len(ss.Labels) > 0 %}
|
||||
{% for _, label := range ss.Labels %}
|
||||
,{%q= label.Name %}:{%q= label.Value %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
},
|
||||
"values":[
|
||||
{% code points := ss.Points %}
|
||||
{% if len(points) > 0 %}
|
||||
{%= formatStatsPoint(&points[0]) %}
|
||||
{% code points = points[1:] %}
|
||||
{% for i := range points %}
|
||||
,{%= formatStatsPoint(&points[i]) %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% func formatStatsPoint(p *statsPoint) %}
|
||||
[
|
||||
{%f= float64(p.Timestamp)/1e9 %},
|
||||
{%q= p.Value %}
|
||||
]
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
188
app/vlselect/logsql/stats_query_range_response.qtpl.go
Normal file
188
app/vlselect/logsql/stats_query_range_response.qtpl.go
Normal file
|
@ -0,0 +1,188 @@
|
|||
// Code generated by qtc from "stats_query_range_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
// StatsQueryRangeResponse generates response for /select/logsql/stats_query_range
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:4
|
||||
package logsql
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:4
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:4
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:4
|
||||
func StreamStatsQueryRangeResponse(qw422016 *qt422016.Writer, rows []*statsSeries) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:4
|
||||
qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:10
|
||||
if len(rows) > 0 {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:11
|
||||
streamformatStatsSeries(qw422016, rows[0])
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:12
|
||||
rows = rows[1:]
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:13
|
||||
for i := range rows {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:13
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:14
|
||||
streamformatStatsSeries(qw422016, rows[i])
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:15
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:16
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:16
|
||||
qw422016.N().S(`]}}`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
func WriteStatsQueryRangeResponse(qq422016 qtio422016.Writer, rows []*statsSeries) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
StreamStatsQueryRangeResponse(qw422016, rows)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
func StatsQueryRangeResponse(rows []*statsSeries) string {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
WriteStatsQueryRangeResponse(qb422016, rows)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
return qs422016
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:20
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:22
|
||||
func streamformatStatsSeries(qw422016 *qt422016.Writer, ss *statsSeries) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:22
|
||||
qw422016.N().S(`{"metric":{"__name__":`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:25
|
||||
qw422016.N().Q(ss.Name)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:26
|
||||
if len(ss.Labels) > 0 {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:27
|
||||
for _, label := range ss.Labels {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:27
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:28
|
||||
qw422016.N().Q(label.Name)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:28
|
||||
qw422016.N().S(`:`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:28
|
||||
qw422016.N().Q(label.Value)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:29
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:30
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:30
|
||||
qw422016.N().S(`},"values":[`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:33
|
||||
points := ss.Points
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:34
|
||||
if len(points) > 0 {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:35
|
||||
streamformatStatsPoint(qw422016, &points[0])
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:36
|
||||
points = points[1:]
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:37
|
||||
for i := range points {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:37
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:38
|
||||
streamformatStatsPoint(qw422016, &points[i])
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:39
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:40
|
||||
}
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:40
|
||||
qw422016.N().S(`]}`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
func writeformatStatsSeries(qq422016 qtio422016.Writer, ss *statsSeries) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
streamformatStatsSeries(qw422016, ss)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
func formatStatsSeries(ss *statsSeries) string {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
writeformatStatsSeries(qb422016, ss)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
return qs422016
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:43
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:45
|
||||
func streamformatStatsPoint(qw422016 *qt422016.Writer, p *statsPoint) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:45
|
||||
qw422016.N().S(`[`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:47
|
||||
qw422016.N().F(float64(p.Timestamp) / 1e9)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:47
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:48
|
||||
qw422016.N().Q(p.Value)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:48
|
||||
qw422016.N().S(`]`)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
func writeformatStatsPoint(qq422016 qtio422016.Writer, p *statsPoint) {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
streamformatStatsPoint(qw422016, p)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
}
|
||||
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
func formatStatsPoint(p *statsPoint) string {
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
writeformatStatsPoint(qb422016, p)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
return qs422016
|
||||
//line app/vlselect/logsql/stats_query_range_response.qtpl:50
|
||||
}
|
|
@ -197,6 +197,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
|
|||
logsqlStatsQueryRequests.Inc()
|
||||
logsql.ProcessStatsQueryRequest(ctx, w, r)
|
||||
return true
|
||||
case "/select/logsql/stats_query_range":
|
||||
logsqlStatsQueryRangeRequests.Inc()
|
||||
logsql.ProcessStatsQueryRangeRequest(ctx, w, r)
|
||||
return true
|
||||
case "/select/logsql/stream_field_names":
|
||||
logsqlStreamFieldNamesRequests.Inc()
|
||||
logsql.ProcessStreamFieldNamesRequest(ctx, w, r)
|
||||
|
@ -237,6 +241,7 @@ var (
|
|||
logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`)
|
||||
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
|
||||
logsqlStatsQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stats_query"}`)
|
||||
logsqlStatsQueryRangeRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stats_query_range"}`)
|
||||
logsqlStreamFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_names"}`)
|
||||
logsqlStreamFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_values"}`)
|
||||
logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`)
|
||||
|
|
|
@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
## tip
|
||||
|
||||
* FEATURE: add [`/select/logsql/stats_query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats), which is going to be used by [vmalert](https://docs.victoriametrics.com/vmalert/) for executing alerting and recording rules against VictoriaLogs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6942) for details.
|
||||
* FEATURE: add [`/select/logsql/stats_query_range` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-range-stats), which is going to be used by [VictoriaLogs plugin for Grafana](https://docs.victoriametrics.com/victorialogs/victorialogs-datasource/) for building time series panels. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6943) for details.
|
||||
* FEATURE: optimize [multi-exact queries](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) with many phrases to search. For example, `ip:in(path:="/foo/bar" | keep ip)` when there are many unique values for `ip` field among log entries with `/foo/bar` path.
|
||||
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add support for displaying the top 5 log streams in the hits graph. The remaining log streams are grouped into an "other" label. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545).
|
||||
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to customize the graph display with options for bar, line, stepped line, and points.
|
||||
|
|
|
@ -14,6 +14,7 @@ VictoriaLogs provides the following HTTP endpoints:
|
|||
- [`/select/logsql/tail`](#live-tailing) for live tailing of query results.
|
||||
- [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range.
|
||||
- [`/select/logsql/stats_query`](#querying-log-stats) for querying log stats at the given time.
|
||||
- [`/select/logsql/stats_query_range`](#querying-log-range-stats) for querying log stats over the given time range.
|
||||
- [`/select/logsql/stream_ids`](#querying-stream_ids) for querying `_stream_id` values of [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
- [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
- [`/select/logsql/stream_field_names`](#querying-stream-field-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names.
|
||||
|
@ -107,6 +108,7 @@ See also:
|
|||
- [Live tailing](#live-tailing)
|
||||
- [Querying hits stats](#querying-hits-stats)
|
||||
- [Querying log stats](#querying-log-stats)
|
||||
- [Querying log range stats](#querying-log-range-stats)
|
||||
- [Querying streams](#querying-streams)
|
||||
- [Querying stream field names](#querying-stream-field-names)
|
||||
- [Querying stream field values](#querying-stream-field-values)
|
||||
|
@ -276,6 +278,7 @@ See also:
|
|||
|
||||
- [Querying logs](#querying-logs)
|
||||
- [Querying log stats](#querying-log-stats)
|
||||
- [Querying log range stats](#querying-log-range-stats)
|
||||
- [Querying streams](#querying-streams)
|
||||
- [HTTP API](#http-api)
|
||||
|
||||
|
@ -285,12 +288,12 @@ VictoriaLogs provides `/select/logsql/stats_query?query=<query>&time=<t>` HTTP e
|
|||
for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) at the given timestamp `t`
|
||||
in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries).
|
||||
|
||||
The `<t>` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
|
||||
If `<t>` is missing, then it equals to the current time.
|
||||
|
||||
The `<query>` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics
|
||||
with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe.
|
||||
|
||||
The `<t>` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
|
||||
If `<t>` is missing, then it equals to the current time.
|
||||
|
||||
For example, the following command returns the number of logs per each `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
across logs over `2024-01-01` day by UTC:
|
||||
|
||||
|
@ -345,6 +348,100 @@ The `/select/logsql/stats_query` API is useful for generating Prometheus-compati
|
|||
|
||||
See also:
|
||||
|
||||
- [Querying log range stats](#querying-log-range-stats)
|
||||
- [Querying logs](#querying-logs)
|
||||
- [Querying hits stats](#querying-hits-stats)
|
||||
- [HTTP API](#http-api)
|
||||
|
||||
### Querying log range stats
|
||||
|
||||
VictoriaLogs provides `/select/logsql/stats_query_range?query=<query>&start=<start>&end=<end>&step=<step>` HTTP endpoint, which returns log stats
|
||||
for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[start ... end]` time range with the given `step` interval.
|
||||
The stats is returned in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries).
|
||||
|
||||
The `<query>` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics
|
||||
with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe.
|
||||
|
||||
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
|
||||
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
|
||||
If `<end>` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs.
|
||||
|
||||
The `<step>` arg can contain values in [the format specified here](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets).
|
||||
If `<step>` is missing, then it equals to `1d` (one day).
|
||||
|
||||
For example, the following command returns the number of logs per each `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
across logs over `2024-01-01` day by UTC with 6-hour granularity:
|
||||
|
||||
```sh
|
||||
curl http://localhost:9428/select/logsql/stats_query_range -d 'query=* | stats by (level) count(*)' -d 'start=2024-01-01' -d 'end=2024-01-02' -d 'step=6h'
|
||||
```
|
||||
|
||||
Below is an example JSON output returned from this endpoint:
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "success",
|
||||
"data": {
|
||||
"resultType": "matrix",
|
||||
"result": [
|
||||
{
|
||||
"metric": {
|
||||
"__name__": "count(*)",
|
||||
"level": "info"
|
||||
},
|
||||
"values": [
|
||||
[
|
||||
1704067200,
|
||||
"103125"
|
||||
],
|
||||
[
|
||||
1704088800,
|
||||
"102500"
|
||||
],
|
||||
[
|
||||
1704110400,
|
||||
"103125"
|
||||
],
|
||||
[
|
||||
1704132000,
|
||||
"102500"
|
||||
]
|
||||
]
|
||||
},
|
||||
{
|
||||
"metric": {
|
||||
"__name__": "count(*)",
|
||||
"level": "error"
|
||||
},
|
||||
"values": [
|
||||
[
|
||||
1704067200,
|
||||
"31"
|
||||
],
|
||||
[
|
||||
1704088800,
|
||||
"25"
|
||||
],
|
||||
[
|
||||
1704110400,
|
||||
"31"
|
||||
],
|
||||
[
|
||||
1704132000,
|
||||
"125"
|
||||
]
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `/select/logsql/stats_query_range` API is useful for generating Prometheus-compatible graphs in Grafana.
|
||||
|
||||
See also:
|
||||
|
||||
- [Querying log stats](#querying-log-stats)
|
||||
- [Querying logs](#querying-logs)
|
||||
- [Querying hits stats](#querying-hits-stats)
|
||||
- [HTTP API](#http-api)
|
||||
|
|
|
@ -454,10 +454,12 @@ func (q *Query) Optimize() {
|
|||
}
|
||||
}
|
||||
|
||||
// GetStatsByFields returns `| stats by (...)` fields from q if q contains safe `| stats ...` pipe in the end.
|
||||
// GetStatsByFields returns `by (...)` fields from the last `stats` pipe at q.
|
||||
//
|
||||
// If step > 0, then _time:step field is added to the last `stats by(...)` pipe at q.
|
||||
//
|
||||
// False is returned if q doesn't contain safe `| stats ...` pipe.
|
||||
func (q *Query) GetStatsByFields() ([]string, bool) {
|
||||
func (q *Query) GetStatsByFields(step int64) ([]string, bool) {
|
||||
pipes := q.pipes
|
||||
|
||||
idx := getLastPipeStatsIdx(pipes)
|
||||
|
@ -465,8 +467,13 @@ func (q *Query) GetStatsByFields() ([]string, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
ps := pipes[idx].(*pipeStats)
|
||||
|
||||
// add _time:step to ps.byFields if it doesn't contain it yet.
|
||||
ps.byFields = addByTimeField(ps.byFields, step)
|
||||
|
||||
// extract by(...) field names from stats pipe
|
||||
byFields := pipes[idx].(*pipeStats).byFields
|
||||
byFields := ps.byFields
|
||||
fields := make([]string, len(byFields))
|
||||
for i, f := range byFields {
|
||||
fields[i] = f.name
|
||||
|
@ -525,6 +532,34 @@ func getLastPipeStatsIdx(pipes []pipe) int {
|
|||
return -1
|
||||
}
|
||||
|
||||
func addByTimeField(byFields []*byStatsField, step int64) []*byStatsField {
|
||||
if step <= 0 {
|
||||
return byFields
|
||||
}
|
||||
stepStr := fmt.Sprintf("%d", step)
|
||||
dstFields := make([]*byStatsField, 0, len(byFields)+1)
|
||||
hasByTime := false
|
||||
for _, f := range byFields {
|
||||
if f.name == "_time" {
|
||||
f = &byStatsField{
|
||||
name: "_time",
|
||||
bucketSizeStr: stepStr,
|
||||
bucketSize: float64(step),
|
||||
}
|
||||
hasByTime = true
|
||||
}
|
||||
dstFields = append(dstFields, f)
|
||||
}
|
||||
if !hasByTime {
|
||||
dstFields = append(dstFields, &byStatsField{
|
||||
name: "_time",
|
||||
bucketSizeStr: stepStr,
|
||||
bucketSize: float64(step),
|
||||
})
|
||||
}
|
||||
return dstFields
|
||||
}
|
||||
|
||||
func removeStarFilters(f filter) filter {
|
||||
visitFunc := func(f filter) bool {
|
||||
fp, ok := f.(*filterPrefix)
|
||||
|
|
|
@ -2101,6 +2101,35 @@ func TestQueryDropAllPipes(t *testing.T) {
|
|||
f(`foo | filter bar:baz | stats by (x) min(y)`, `foo bar:baz`)
|
||||
}
|
||||
|
||||
func TestQueryGetStatsByFields_PositiveStep(t *testing.T) {
|
||||
f := func(qStr string, step int64, fieldsExpected []string, qExpected string) {
|
||||
t.Helper()
|
||||
|
||||
q, err := ParseQuery(qStr)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
fields, ok := q.GetStatsByFields(step)
|
||||
if !ok {
|
||||
t.Fatalf("cannot obtain byFields from the query [%s]", qStr)
|
||||
}
|
||||
if !reflect.DeepEqual(fields, fieldsExpected) {
|
||||
t.Fatalf("unexpected byFields;\ngot\n%q\nwant\n%q", fields, fieldsExpected)
|
||||
}
|
||||
|
||||
// Verify the resulting query
|
||||
qResult := q.String()
|
||||
if qResult != qExpected {
|
||||
t.Fatalf("unexpected query\ngot\n%s\nwant\n%s", qResult, qExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f(`* | count()`, nsecsPerHour, []string{"_time"}, `* | stats by (_time:3600000000000) count(*) as "count(*)"`)
|
||||
f(`* | by (level) count() x`, nsecsPerDay, []string{"level", "_time"}, `* | stats by (level, _time:86400000000000) count(*) as x`)
|
||||
f(`* | by (_time:1m) count() x`, nsecsPerDay, []string{"_time"}, `* | stats by (_time:86400000000000) count(*) as x`)
|
||||
f(`* | by (_time:1m offset 30s,level) count() x, count_uniq(z) y`, nsecsPerDay, []string{"_time", "level"}, `* | stats by (_time:86400000000000, level) count(*) as x, count_uniq(z) as y`)
|
||||
}
|
||||
|
||||
func TestQueryGetStatsByFields_Success(t *testing.T) {
|
||||
f := func(qStr string, fieldsExpected []string) {
|
||||
t.Helper()
|
||||
|
@ -2109,7 +2138,7 @@ func TestQueryGetStatsByFields_Success(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
fields, ok := q.GetStatsByFields()
|
||||
fields, ok := q.GetStatsByFields(0)
|
||||
if !ok {
|
||||
t.Fatalf("cannot obtain byFields from the query [%s]", qStr)
|
||||
}
|
||||
|
@ -2156,7 +2185,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
fields, ok := q.GetStatsByFields()
|
||||
fields, ok := q.GetStatsByFields(0)
|
||||
if ok {
|
||||
t.Fatalf("expecting failure to get byFields for the query [%s]", qStr)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue