This commit is contained in:
Aliaksandr Valialkin 2024-05-20 01:45:29 +02:00
parent dcc68e4e08
commit 411ba6f0e8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 394 additions and 32 deletions

View file

@ -0,0 +1,69 @@
{% import (
"slices"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) %}
{% stripspace %}
// LabelsForHits formats labels for /select/logsql/hits response
{% func LabelsForHits(columns []logstorage.BlockColumn, rowIdx int) %}
{
{% if len(columns) > 0 %}
{%q= columns[0].Name %}:{%q= columns[0].Values[rowIdx] %}
{% for _, c := range columns[1:] %}
,{%q= c.Name %}:{%q= c.Values[rowIdx] %}
{% endfor %}
{% endif %}
}
{% endfunc %}
{% func HitsSeries(m map[string]*hitsSeries) %}
{
{% code
sortedKeys := make([]string, 0, len(m))
for k := range m {
sortedKeys = append(sortedKeys, k)
}
slices.Sort(sortedKeys)
%}
"hits":[
{% if len(sortedKeys) > 0 %}
{%= hitsSeriesLine(m, sortedKeys[0]) %}
{% for _, k := range sortedKeys[1:] %}
,{%= hitsSeriesLine(m, k) %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% func hitsSeriesLine(m map[string]*hitsSeries, k string) %}
{
{% code
hs := m[k]
hs.sort()
timestamps := hs.timestamps
values := hs.values
%}
"fields":{%s= k %},
"timestamps":[
{% if len(timestamps) > 0 %}
{%q= timestamps[0] %}
{% for _, ts := range timestamps[1:] %}
,{%q= ts %}
{% endfor %}
{% endif %}
],
"values":[
{% if len(values) > 0 %}
{%s= values[0] %}
{% for _, v := range values[1:] %}
,{%s= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,219 @@
// Code generated by qtc from "hits_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vlselect/logsql/hits_response.qtpl:1
package logsql
//line app/vlselect/logsql/hits_response.qtpl:1
import (
"slices"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// LabelsForHits formats labels for /select/logsql/hits response
//line app/vlselect/logsql/hits_response.qtpl:10
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/hits_response.qtpl:10
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/hits_response.qtpl:10
func StreamLabelsForHits(qw422016 *qt422016.Writer, columns []logstorage.BlockColumn, rowIdx int) {
//line app/vlselect/logsql/hits_response.qtpl:10
qw422016.N().S(`{`)
//line app/vlselect/logsql/hits_response.qtpl:12
if len(columns) > 0 {
//line app/vlselect/logsql/hits_response.qtpl:13
qw422016.N().Q(columns[0].Name)
//line app/vlselect/logsql/hits_response.qtpl:13
qw422016.N().S(`:`)
//line app/vlselect/logsql/hits_response.qtpl:13
qw422016.N().Q(columns[0].Values[rowIdx])
//line app/vlselect/logsql/hits_response.qtpl:14
for _, c := range columns[1:] {
//line app/vlselect/logsql/hits_response.qtpl:14
qw422016.N().S(`,`)
//line app/vlselect/logsql/hits_response.qtpl:15
qw422016.N().Q(c.Name)
//line app/vlselect/logsql/hits_response.qtpl:15
qw422016.N().S(`:`)
//line app/vlselect/logsql/hits_response.qtpl:15
qw422016.N().Q(c.Values[rowIdx])
//line app/vlselect/logsql/hits_response.qtpl:16
}
//line app/vlselect/logsql/hits_response.qtpl:17
}
//line app/vlselect/logsql/hits_response.qtpl:17
qw422016.N().S(`}`)
//line app/vlselect/logsql/hits_response.qtpl:19
}
//line app/vlselect/logsql/hits_response.qtpl:19
func WriteLabelsForHits(qq422016 qtio422016.Writer, columns []logstorage.BlockColumn, rowIdx int) {
//line app/vlselect/logsql/hits_response.qtpl:19
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/hits_response.qtpl:19
StreamLabelsForHits(qw422016, columns, rowIdx)
//line app/vlselect/logsql/hits_response.qtpl:19
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/hits_response.qtpl:19
}
//line app/vlselect/logsql/hits_response.qtpl:19
func LabelsForHits(columns []logstorage.BlockColumn, rowIdx int) string {
//line app/vlselect/logsql/hits_response.qtpl:19
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/hits_response.qtpl:19
WriteLabelsForHits(qb422016, columns, rowIdx)
//line app/vlselect/logsql/hits_response.qtpl:19
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/hits_response.qtpl:19
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/hits_response.qtpl:19
return qs422016
//line app/vlselect/logsql/hits_response.qtpl:19
}
//line app/vlselect/logsql/hits_response.qtpl:21
func StreamHitsSeries(qw422016 *qt422016.Writer, m map[string]*hitsSeries) {
//line app/vlselect/logsql/hits_response.qtpl:21
qw422016.N().S(`{`)
//line app/vlselect/logsql/hits_response.qtpl:24
sortedKeys := make([]string, 0, len(m))
for k := range m {
sortedKeys = append(sortedKeys, k)
}
slices.Sort(sortedKeys)
//line app/vlselect/logsql/hits_response.qtpl:29
qw422016.N().S(`"hits":[`)
//line app/vlselect/logsql/hits_response.qtpl:31
if len(sortedKeys) > 0 {
//line app/vlselect/logsql/hits_response.qtpl:32
streamhitsSeriesLine(qw422016, m, sortedKeys[0])
//line app/vlselect/logsql/hits_response.qtpl:33
for _, k := range sortedKeys[1:] {
//line app/vlselect/logsql/hits_response.qtpl:33
qw422016.N().S(`,`)
//line app/vlselect/logsql/hits_response.qtpl:34
streamhitsSeriesLine(qw422016, m, k)
//line app/vlselect/logsql/hits_response.qtpl:35
}
//line app/vlselect/logsql/hits_response.qtpl:36
}
//line app/vlselect/logsql/hits_response.qtpl:36
qw422016.N().S(`]}`)
//line app/vlselect/logsql/hits_response.qtpl:39
}
//line app/vlselect/logsql/hits_response.qtpl:39
func WriteHitsSeries(qq422016 qtio422016.Writer, m map[string]*hitsSeries) {
//line app/vlselect/logsql/hits_response.qtpl:39
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/hits_response.qtpl:39
StreamHitsSeries(qw422016, m)
//line app/vlselect/logsql/hits_response.qtpl:39
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/hits_response.qtpl:39
}
//line app/vlselect/logsql/hits_response.qtpl:39
func HitsSeries(m map[string]*hitsSeries) string {
//line app/vlselect/logsql/hits_response.qtpl:39
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/hits_response.qtpl:39
WriteHitsSeries(qb422016, m)
//line app/vlselect/logsql/hits_response.qtpl:39
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/hits_response.qtpl:39
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/hits_response.qtpl:39
return qs422016
//line app/vlselect/logsql/hits_response.qtpl:39
}
//line app/vlselect/logsql/hits_response.qtpl:41
func streamhitsSeriesLine(qw422016 *qt422016.Writer, m map[string]*hitsSeries, k string) {
//line app/vlselect/logsql/hits_response.qtpl:41
qw422016.N().S(`{`)
//line app/vlselect/logsql/hits_response.qtpl:44
hs := m[k]
hs.sort()
timestamps := hs.timestamps
values := hs.values
//line app/vlselect/logsql/hits_response.qtpl:48
qw422016.N().S(`"fields":`)
//line app/vlselect/logsql/hits_response.qtpl:49
qw422016.N().S(k)
//line app/vlselect/logsql/hits_response.qtpl:49
qw422016.N().S(`,"timestamps":[`)
//line app/vlselect/logsql/hits_response.qtpl:51
if len(timestamps) > 0 {
//line app/vlselect/logsql/hits_response.qtpl:52
qw422016.N().Q(timestamps[0])
//line app/vlselect/logsql/hits_response.qtpl:53
for _, ts := range timestamps[1:] {
//line app/vlselect/logsql/hits_response.qtpl:53
qw422016.N().S(`,`)
//line app/vlselect/logsql/hits_response.qtpl:54
qw422016.N().Q(ts)
//line app/vlselect/logsql/hits_response.qtpl:55
}
//line app/vlselect/logsql/hits_response.qtpl:56
}
//line app/vlselect/logsql/hits_response.qtpl:56
qw422016.N().S(`],"values":[`)
//line app/vlselect/logsql/hits_response.qtpl:59
if len(values) > 0 {
//line app/vlselect/logsql/hits_response.qtpl:60
qw422016.N().S(values[0])
//line app/vlselect/logsql/hits_response.qtpl:61
for _, v := range values[1:] {
//line app/vlselect/logsql/hits_response.qtpl:61
qw422016.N().S(`,`)
//line app/vlselect/logsql/hits_response.qtpl:62
qw422016.N().S(v)
//line app/vlselect/logsql/hits_response.qtpl:63
}
//line app/vlselect/logsql/hits_response.qtpl:64
}
//line app/vlselect/logsql/hits_response.qtpl:64
qw422016.N().S(`]}`)
//line app/vlselect/logsql/hits_response.qtpl:67
}
//line app/vlselect/logsql/hits_response.qtpl:67
func writehitsSeriesLine(qq422016 qtio422016.Writer, m map[string]*hitsSeries, k string) {
//line app/vlselect/logsql/hits_response.qtpl:67
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/hits_response.qtpl:67
streamhitsSeriesLine(qw422016, m, k)
//line app/vlselect/logsql/hits_response.qtpl:67
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/hits_response.qtpl:67
}
//line app/vlselect/logsql/hits_response.qtpl:67
func hitsSeriesLine(m map[string]*hitsSeries, k string) string {
//line app/vlselect/logsql/hits_response.qtpl:67
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/hits_response.qtpl:67
writehitsSeriesLine(qb422016, m, k)
//line app/vlselect/logsql/hits_response.qtpl:67
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/hits_response.qtpl:67
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/hits_response.qtpl:67
return qs422016
//line app/vlselect/logsql/hits_response.qtpl:67
}

View file

@ -6,6 +6,8 @@ import (
"math" "math"
"net/http" "net/http"
"slices" "slices"
"sort"
"strings"
"sync" "sync"
"time" "time"
@ -55,43 +57,74 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ
// Obtain field entries // Obtain field entries
fields := r.Form["field"] fields := r.Form["field"]
// Prepare the query
q.AddCountByTimePipe(int64(step), int64(offset), fields) q.AddCountByTimePipe(int64(step), int64(offset), fields)
q.Optimize() q.Optimize()
var wLock sync.Mutex var mLock sync.Mutex
isFirstWrite := true m := make(map[string]*hitsSeries)
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 || len(columns[0].Values) == 0 { if len(columns) == 0 || len(columns[0].Values) == 0 {
return return
} }
timestampValues := columns[0].Values
hitsValues := columns[len(columns)-1].Values
columns = columns[1 : len(columns)-1]
bb := blockResultPool.Get() bb := blockResultPool.Get()
for i := range timestamps { for i := range timestamps {
bb.B = append(bb.B, ',') timestampStr := strings.Clone(timestampValues[i])
WriteJSONRow(bb, columns, i) hitsStr := strings.Clone(hitsValues[i])
// Remove newline at the end
bb.B = bb.B[:len(bb.B)-1] bb.Reset()
WriteLabelsForHits(bb, columns, i)
mLock.Lock()
hs, ok := m[string(bb.B)]
if !ok {
k := string(bb.B)
hs = &hitsSeries{}
m[k] = hs
} }
wLock.Lock() hs.timestamps = append(hs.timestamps, timestampStr)
buf := bb.B hs.values = append(hs.values, hitsStr)
if isFirstWrite { mLock.Unlock()
buf = buf[1:]
isFirstWrite = false
} }
_, _ = w.Write(buf)
wLock.Unlock()
blockResultPool.Put(bb) blockResultPool.Put(bb)
} }
// Execute the query
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
return
}
// Write response // Write response
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
fmt.Fprintf(w, `{"rows":[`) WriteHitsSeries(w, m)
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
fmt.Fprintf(w, `]}`)
if err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
} }
type hitsSeries struct {
timestamps []string
values []string
}
func (hs *hitsSeries) sort() {
sort.Sort(hs)
}
func (hs *hitsSeries) Len() int {
return len(hs.timestamps)
}
func (hs *hitsSeries) Swap(i, j int) {
hs.timestamps[i], hs.timestamps[j] = hs.timestamps[j], hs.timestamps[i]
hs.values[i], hs.values[j] = hs.values[j], hs.values[i]
}
func (hs *hitsSeries) Less(i, j int) bool {
return hs.timestamps[i] < hs.timestamps[j]
} }
// ProcessFieldNamesRequest handles /select/logsql/field_names request. // ProcessFieldNamesRequest handles /select/logsql/field_names request.

View file

@ -112,19 +112,21 @@ Below is an example JSON output returned from this endpoint:
```json ```json
{ {
"rows": [ "hits": [
{ {
"_time": "2024-01-12T00:00:00Z", "fields": {},
"hits": "800000" "timestamps": [
}, "2024-01-01T00:00:00Z",
{ "2024-01-01T01:00:00Z",
"_time": "2024-01-12T01:00:00Z", "2024-01-01T02:00:00Z"
"hits": "800000" ],
}, "values": [
{ 410339,
"_time": "2024-01-12T02:00:00Z", 450311,
"hits": "820000" 899506
]
} }
]
} }
``` ```
@ -141,7 +143,46 @@ Additionally, any number of `field=<field_name>` args can be passed to `/select/
For example, the following query groups hits by `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) additionally to the provided `step`: For example, the following query groups hits by `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) additionally to the provided `step`:
```logsql ```logsql
curl http://localhost:9428/select/logsql/hits -d 'query=*' -d 'start=1w' -d 'step=1d' -d 'field=level' curl http://localhost:9428/select/logsql/hits -d 'query=*' -d 'start=3h' -d 'step=1h' -d 'field=level'
```
The grouped fields are put inside `"fields"` object:
```json
{
"hits": [
{
"fields": {
"level": "error"
},
"timestamps": [
"2024-01-01T00:00:00Z",
"2024-01-01T01:00:00Z",
"2024-01-01T02:00:00Z"
],
"values": [
25,
20,
15
]
},
{
"fields": {
"level": "info"
},
"timestamps": [
"2024-01-01T00:00:00Z",
"2024-01-01T01:00:00Z",
"2024-01-01T02:00:00Z"
],
"values": [
25625,
35043,
25230
]
}
]
}
``` ```
See also: See also: