diff --git a/app/vlselect/logsql/hits_response.qtpl b/app/vlselect/logsql/hits_response.qtpl new file mode 100644 index 000000000..f9976f7ab --- /dev/null +++ b/app/vlselect/logsql/hits_response.qtpl @@ -0,0 +1,69 @@ +{% import ( + "slices" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) %} + +{% stripspace %} + +// LabelsForHits formats labels for /select/logsql/hits response +{% func LabelsForHits(columns []logstorage.BlockColumn, rowIdx int) %} +{ + {% if len(columns) > 0 %} + {%q= columns[0].Name %}:{%q= columns[0].Values[rowIdx] %} + {% for _, c := range columns[1:] %} + ,{%q= c.Name %}:{%q= c.Values[rowIdx] %} + {% endfor %} + {% endif %} +} +{% endfunc %} + +{% func HitsSeries(m map[string]*hitsSeries) %} +{ + {% code + sortedKeys := make([]string, 0, len(m)) + for k := range m { + sortedKeys = append(sortedKeys, k) + } + slices.Sort(sortedKeys) + %} + "hits":[ + {% if len(sortedKeys) > 0 %} + {%= hitsSeriesLine(m, sortedKeys[0]) %} + {% for _, k := range sortedKeys[1:] %} + ,{%= hitsSeriesLine(m, k) %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% func hitsSeriesLine(m map[string]*hitsSeries, k string) %} +{ + {% code + hs := m[k] + hs.sort() + timestamps := hs.timestamps + values := hs.values + %} + "fields":{%s= k %}, + "timestamps":[ + {% if len(timestamps) > 0 %} + {%q= timestamps[0] %} + {% for _, ts := range timestamps[1:] %} + ,{%q= ts %} + {% endfor %} + {% endif %} + ], + "values":[ + {% if len(values) > 0 %} + {%s= values[0] %} + {% for _, v := range values[1:] %} + ,{%s= v %} + {% endfor %} + {% endif %} + ] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/hits_response.qtpl.go b/app/vlselect/logsql/hits_response.qtpl.go new file mode 100644 index 000000000..a0d463952 --- /dev/null +++ b/app/vlselect/logsql/hits_response.qtpl.go @@ -0,0 +1,219 @@ +// Code generated by qtc from "hits_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vlselect/logsql/hits_response.qtpl:1 +package logsql + +//line app/vlselect/logsql/hits_response.qtpl:1 +import ( + "slices" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +// LabelsForHits formats labels for /select/logsql/hits response + +//line app/vlselect/logsql/hits_response.qtpl:10 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/hits_response.qtpl:10 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/hits_response.qtpl:10 +func StreamLabelsForHits(qw422016 *qt422016.Writer, columns []logstorage.BlockColumn, rowIdx int) { +//line app/vlselect/logsql/hits_response.qtpl:10 + qw422016.N().S(`{`) +//line app/vlselect/logsql/hits_response.qtpl:12 + if len(columns) > 0 { +//line app/vlselect/logsql/hits_response.qtpl:13 + qw422016.N().Q(columns[0].Name) +//line app/vlselect/logsql/hits_response.qtpl:13 + qw422016.N().S(`:`) +//line app/vlselect/logsql/hits_response.qtpl:13 + qw422016.N().Q(columns[0].Values[rowIdx]) +//line app/vlselect/logsql/hits_response.qtpl:14 + for _, c := range columns[1:] { +//line app/vlselect/logsql/hits_response.qtpl:14 + qw422016.N().S(`,`) +//line app/vlselect/logsql/hits_response.qtpl:15 + qw422016.N().Q(c.Name) +//line app/vlselect/logsql/hits_response.qtpl:15 + qw422016.N().S(`:`) +//line app/vlselect/logsql/hits_response.qtpl:15 + qw422016.N().Q(c.Values[rowIdx]) +//line app/vlselect/logsql/hits_response.qtpl:16 + } +//line app/vlselect/logsql/hits_response.qtpl:17 + } +//line app/vlselect/logsql/hits_response.qtpl:17 + qw422016.N().S(`}`) +//line app/vlselect/logsql/hits_response.qtpl:19 +} + +//line app/vlselect/logsql/hits_response.qtpl:19 +func WriteLabelsForHits(qq422016 qtio422016.Writer, columns []logstorage.BlockColumn, rowIdx int) { +//line app/vlselect/logsql/hits_response.qtpl:19 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/hits_response.qtpl:19 + StreamLabelsForHits(qw422016, columns, rowIdx) +//line app/vlselect/logsql/hits_response.qtpl:19 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/hits_response.qtpl:19 +} + +//line app/vlselect/logsql/hits_response.qtpl:19 +func LabelsForHits(columns []logstorage.BlockColumn, rowIdx int) string { +//line app/vlselect/logsql/hits_response.qtpl:19 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/hits_response.qtpl:19 + WriteLabelsForHits(qb422016, columns, rowIdx) +//line app/vlselect/logsql/hits_response.qtpl:19 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/hits_response.qtpl:19 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/hits_response.qtpl:19 + return qs422016 +//line app/vlselect/logsql/hits_response.qtpl:19 +} + +//line app/vlselect/logsql/hits_response.qtpl:21 +func StreamHitsSeries(qw422016 *qt422016.Writer, m map[string]*hitsSeries) { +//line app/vlselect/logsql/hits_response.qtpl:21 + qw422016.N().S(`{`) +//line app/vlselect/logsql/hits_response.qtpl:24 + sortedKeys := make([]string, 0, len(m)) + for k := range m { + sortedKeys = append(sortedKeys, k) + } + slices.Sort(sortedKeys) + +//line app/vlselect/logsql/hits_response.qtpl:29 + qw422016.N().S(`"hits":[`) +//line app/vlselect/logsql/hits_response.qtpl:31 + if len(sortedKeys) > 0 { +//line app/vlselect/logsql/hits_response.qtpl:32 + streamhitsSeriesLine(qw422016, m, sortedKeys[0]) +//line app/vlselect/logsql/hits_response.qtpl:33 + for _, k := range sortedKeys[1:] { +//line app/vlselect/logsql/hits_response.qtpl:33 + qw422016.N().S(`,`) +//line app/vlselect/logsql/hits_response.qtpl:34 + streamhitsSeriesLine(qw422016, m, k) +//line app/vlselect/logsql/hits_response.qtpl:35 + } +//line app/vlselect/logsql/hits_response.qtpl:36 + } +//line app/vlselect/logsql/hits_response.qtpl:36 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/hits_response.qtpl:39 +} + +//line app/vlselect/logsql/hits_response.qtpl:39 +func WriteHitsSeries(qq422016 qtio422016.Writer, m map[string]*hitsSeries) { +//line app/vlselect/logsql/hits_response.qtpl:39 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/hits_response.qtpl:39 + StreamHitsSeries(qw422016, m) +//line app/vlselect/logsql/hits_response.qtpl:39 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/hits_response.qtpl:39 +} + +//line app/vlselect/logsql/hits_response.qtpl:39 +func HitsSeries(m map[string]*hitsSeries) string { +//line app/vlselect/logsql/hits_response.qtpl:39 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/hits_response.qtpl:39 + WriteHitsSeries(qb422016, m) +//line app/vlselect/logsql/hits_response.qtpl:39 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/hits_response.qtpl:39 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/hits_response.qtpl:39 + return qs422016 +//line app/vlselect/logsql/hits_response.qtpl:39 +} + +//line app/vlselect/logsql/hits_response.qtpl:41 +func streamhitsSeriesLine(qw422016 *qt422016.Writer, m map[string]*hitsSeries, k string) { +//line app/vlselect/logsql/hits_response.qtpl:41 + qw422016.N().S(`{`) +//line app/vlselect/logsql/hits_response.qtpl:44 + hs := m[k] + hs.sort() + timestamps := hs.timestamps + values := hs.values + +//line app/vlselect/logsql/hits_response.qtpl:48 + qw422016.N().S(`"fields":`) +//line app/vlselect/logsql/hits_response.qtpl:49 + qw422016.N().S(k) +//line app/vlselect/logsql/hits_response.qtpl:49 + qw422016.N().S(`,"timestamps":[`) +//line app/vlselect/logsql/hits_response.qtpl:51 + if len(timestamps) > 0 { +//line app/vlselect/logsql/hits_response.qtpl:52 + qw422016.N().Q(timestamps[0]) +//line app/vlselect/logsql/hits_response.qtpl:53 + for _, ts := range timestamps[1:] { +//line app/vlselect/logsql/hits_response.qtpl:53 + qw422016.N().S(`,`) +//line app/vlselect/logsql/hits_response.qtpl:54 + qw422016.N().Q(ts) +//line app/vlselect/logsql/hits_response.qtpl:55 + } +//line app/vlselect/logsql/hits_response.qtpl:56 + } +//line app/vlselect/logsql/hits_response.qtpl:56 + qw422016.N().S(`],"values":[`) +//line app/vlselect/logsql/hits_response.qtpl:59 + if len(values) > 0 { +//line app/vlselect/logsql/hits_response.qtpl:60 + qw422016.N().S(values[0]) +//line app/vlselect/logsql/hits_response.qtpl:61 + for _, v := range values[1:] { +//line app/vlselect/logsql/hits_response.qtpl:61 + qw422016.N().S(`,`) +//line app/vlselect/logsql/hits_response.qtpl:62 + qw422016.N().S(v) +//line app/vlselect/logsql/hits_response.qtpl:63 + } +//line app/vlselect/logsql/hits_response.qtpl:64 + } +//line app/vlselect/logsql/hits_response.qtpl:64 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/hits_response.qtpl:67 +} + +//line app/vlselect/logsql/hits_response.qtpl:67 +func writehitsSeriesLine(qq422016 qtio422016.Writer, m map[string]*hitsSeries, k string) { +//line app/vlselect/logsql/hits_response.qtpl:67 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/hits_response.qtpl:67 + streamhitsSeriesLine(qw422016, m, k) +//line app/vlselect/logsql/hits_response.qtpl:67 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/hits_response.qtpl:67 +} + +//line app/vlselect/logsql/hits_response.qtpl:67 +func hitsSeriesLine(m map[string]*hitsSeries, k string) string { +//line app/vlselect/logsql/hits_response.qtpl:67 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/hits_response.qtpl:67 + writehitsSeriesLine(qb422016, m, k) +//line app/vlselect/logsql/hits_response.qtpl:67 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/hits_response.qtpl:67 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/hits_response.qtpl:67 + return qs422016 +//line app/vlselect/logsql/hits_response.qtpl:67 +} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 73401d0e2..bb82402cd 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -6,6 +6,8 @@ import ( "math" "net/http" "slices" + "sort" + "strings" "sync" "time" @@ -55,43 +57,74 @@ func ProcessHitsRequest(ctx context.Context, w http.ResponseWriter, r *http.Requ // Obtain field entries fields := r.Form["field"] + // Prepare the query q.AddCountByTimePipe(int64(step), int64(offset), fields) q.Optimize() - var wLock sync.Mutex - isFirstWrite := true + var mLock sync.Mutex + m := make(map[string]*hitsSeries) writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 || len(columns[0].Values) == 0 { return } + timestampValues := columns[0].Values + hitsValues := columns[len(columns)-1].Values + columns = columns[1 : len(columns)-1] + bb := blockResultPool.Get() for i := range timestamps { - bb.B = append(bb.B, ',') - WriteJSONRow(bb, columns, i) - // Remove newline at the end - bb.B = bb.B[:len(bb.B)-1] + timestampStr := strings.Clone(timestampValues[i]) + hitsStr := strings.Clone(hitsValues[i]) + + bb.Reset() + WriteLabelsForHits(bb, columns, i) + + mLock.Lock() + hs, ok := m[string(bb.B)] + if !ok { + k := string(bb.B) + hs = &hitsSeries{} + m[k] = hs + } + hs.timestamps = append(hs.timestamps, timestampStr) + hs.values = append(hs.values, hitsStr) + mLock.Unlock() } - wLock.Lock() - buf := bb.B - if isFirstWrite { - buf = buf[1:] - isFirstWrite = false - } - _, _ = w.Write(buf) - wLock.Unlock() blockResultPool.Put(bb) } + // Execute the query + if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil { + httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err) + return + } + // Write response w.Header().Set("Content-Type", "application/json") - fmt.Fprintf(w, `{"rows":[`) - err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) - fmt.Fprintf(w, `]}`) + WriteHitsSeries(w, m) +} - if err != nil { - httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err) - } +type hitsSeries struct { + timestamps []string + values []string +} + +func (hs *hitsSeries) sort() { + sort.Sort(hs) +} + +func (hs *hitsSeries) Len() int { + return len(hs.timestamps) +} + +func (hs *hitsSeries) Swap(i, j int) { + hs.timestamps[i], hs.timestamps[j] = hs.timestamps[j], hs.timestamps[i] + hs.values[i], hs.values[j] = hs.values[j], hs.values[i] +} + +func (hs *hitsSeries) Less(i, j int) bool { + return hs.timestamps[i] < hs.timestamps[j] } // ProcessFieldNamesRequest handles /select/logsql/field_names request. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index fec657b6f..f2d332b15 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -112,19 +112,21 @@ Below is an example JSON output returned from this endpoint: ```json { - "rows": [ + "hits": [ { - "_time": "2024-01-12T00:00:00Z", - "hits": "800000" - }, - { - "_time": "2024-01-12T01:00:00Z", - "hits": "800000" - }, - { - "_time": "2024-01-12T02:00:00Z", - "hits": "820000" + "fields": {}, + "timestamps": [ + "2024-01-01T00:00:00Z", + "2024-01-01T01:00:00Z", + "2024-01-01T02:00:00Z" + ], + "values": [ + 410339, + 450311, + 899506 + ] } + ] } ``` @@ -141,7 +143,46 @@ Additionally, any number of `field=` args can be passed to `/select/ For example, the following query groups hits by `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) additionally to the provided `step`: ```logsql -curl http://localhost:9428/select/logsql/hits -d 'query=*' -d 'start=1w' -d 'step=1d' -d 'field=level' +curl http://localhost:9428/select/logsql/hits -d 'query=*' -d 'start=3h' -d 'step=1h' -d 'field=level' +``` + +The grouped fields are put inside `"fields"` object: + +```json +{ + "hits": [ + { + "fields": { + "level": "error" + }, + "timestamps": [ + "2024-01-01T00:00:00Z", + "2024-01-01T01:00:00Z", + "2024-01-01T02:00:00Z" + ], + "values": [ + 25, + 20, + 15 + ] + }, + { + "fields": { + "level": "info" + }, + "timestamps": [ + "2024-01-01T00:00:00Z", + "2024-01-01T01:00:00Z", + "2024-01-01T02:00:00Z" + ], + "values": [ + 25625, + 35043, + 25230 + ] + } + ] +} ``` See also: