This commit is contained in:
Aliaksandr Valialkin 2024-05-24 03:03:12 +02:00
parent 43bd975bf1
commit 869b2fabc4
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
25 changed files with 816 additions and 633 deletions

View file

@ -1,17 +0,0 @@
{% stripspace %}
// FieldNamesResponse formats /select/logsql/field_names response
{% func FieldNamesResponse(names []string) %}
{
"names":[
{% if len(names) > 0 %}
{%q= names[0] %}
{% for _, v := range names[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -1,69 +0,0 @@
// Code generated by qtc from "field_names_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// FieldNamesResponse formats /select/logsql/field_names response
//line app/vlselect/logsql/field_names_response.qtpl:4
package logsql
//line app/vlselect/logsql/field_names_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/field_names_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/field_names_response.qtpl:4
func StreamFieldNamesResponse(qw422016 *qt422016.Writer, names []string) {
//line app/vlselect/logsql/field_names_response.qtpl:4
qw422016.N().S(`{"names":[`)
//line app/vlselect/logsql/field_names_response.qtpl:7
if len(names) > 0 {
//line app/vlselect/logsql/field_names_response.qtpl:8
qw422016.N().Q(names[0])
//line app/vlselect/logsql/field_names_response.qtpl:9
for _, v := range names[1:] {
//line app/vlselect/logsql/field_names_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/field_names_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/field_names_response.qtpl:11
}
//line app/vlselect/logsql/field_names_response.qtpl:12
}
//line app/vlselect/logsql/field_names_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/field_names_response.qtpl:15
}
//line app/vlselect/logsql/field_names_response.qtpl:15
func WriteFieldNamesResponse(qq422016 qtio422016.Writer, names []string) {
//line app/vlselect/logsql/field_names_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
StreamFieldNamesResponse(qw422016, names)
//line app/vlselect/logsql/field_names_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
}
//line app/vlselect/logsql/field_names_response.qtpl:15
func FieldNamesResponse(names []string) string {
//line app/vlselect/logsql/field_names_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/field_names_response.qtpl:15
WriteFieldNamesResponse(qb422016, names)
//line app/vlselect/logsql/field_names_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/field_names_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/field_names_response.qtpl:15
return qs422016
//line app/vlselect/logsql/field_names_response.qtpl:15
}

View file

@ -1,17 +0,0 @@
{% stripspace %}
// FieldValuesResponse formats /select/logsql/field_values response
{% func FieldValuesResponse(values []string) %}
{
"values":[
{% if len(values) > 0 %}
{%q= values[0] %}
{% for _, v := range values[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -1,69 +0,0 @@
// Code generated by qtc from "field_values_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// FieldValuesResponse formats /select/logsql/field_values response
//line app/vlselect/logsql/field_values_response.qtpl:4
package logsql
//line app/vlselect/logsql/field_values_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/field_values_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/field_values_response.qtpl:4
func StreamFieldValuesResponse(qw422016 *qt422016.Writer, values []string) {
//line app/vlselect/logsql/field_values_response.qtpl:4
qw422016.N().S(`{"values":[`)
//line app/vlselect/logsql/field_values_response.qtpl:7
if len(values) > 0 {
//line app/vlselect/logsql/field_values_response.qtpl:8
qw422016.N().Q(values[0])
//line app/vlselect/logsql/field_values_response.qtpl:9
for _, v := range values[1:] {
//line app/vlselect/logsql/field_values_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/field_values_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/field_values_response.qtpl:11
}
//line app/vlselect/logsql/field_values_response.qtpl:12
}
//line app/vlselect/logsql/field_values_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/field_values_response.qtpl:15
}
//line app/vlselect/logsql/field_values_response.qtpl:15
func WriteFieldValuesResponse(qq422016 qtio422016.Writer, values []string) {
//line app/vlselect/logsql/field_values_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/field_values_response.qtpl:15
StreamFieldValuesResponse(qw422016, values)
//line app/vlselect/logsql/field_values_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/field_values_response.qtpl:15
}
//line app/vlselect/logsql/field_values_response.qtpl:15
func FieldValuesResponse(values []string) string {
//line app/vlselect/logsql/field_values_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/field_values_response.qtpl:15
WriteFieldValuesResponse(qb422016, values)
//line app/vlselect/logsql/field_values_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/field_values_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/field_values_response.qtpl:15
return qs422016
//line app/vlselect/logsql/field_values_response.qtpl:15
}

View file

@ -146,7 +146,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldNamesResponse(w, fieldNames)
WriteValuesWithHitsJSON(w, fieldNames)
}
// ProcessFieldValuesRequest handles /select/logsql/field_values request.
@ -186,7 +186,7 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht
// Write results
w.Header().Set("Content-Type", "application/json")
WriteFieldValuesResponse(w, values)
WriteValuesWithHitsJSON(w, values)
}
// ProcessStreamLabelNamesRequest processes /select/logsql/stream_label_names request.
@ -208,7 +208,7 @@ func ProcessStreamLabelNamesRequest(ctx context.Context, w http.ResponseWriter,
// Write results
w.Header().Set("Content-Type", "application/json")
WriteStreamLabelNamesResponse(w, names)
WriteValuesWithHitsJSON(w, names)
}
// ProcessStreamLabelValuesRequest processes /select/logsql/stream_label_values request.
@ -247,7 +247,7 @@ func ProcessStreamLabelValuesRequest(ctx context.Context, w http.ResponseWriter,
// Write results
w.Header().Set("Content-Type", "application/json")
WriteStreamLabelValuesResponse(w, values)
WriteValuesWithHitsJSON(w, values)
}
// ProcessStreamsRequest processes /select/logsql/streams request.
@ -279,7 +279,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
// Write results
w.Header().Set("Content-Type", "application/json")
WriteStreamsResponse(w, streams)
WriteValuesWithHitsJSON(w, streams)
}
// ProcessQueryRequest handles /select/logsql/query request.

View file

@ -0,0 +1,32 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
) %}
{% stripspace %}
// ValuesWithHitsJSON generates JSON from the given values.
{% func ValuesWithHitsJSON(values []logstorage.ValueWithHits) %}
{
"values":{%= valuesWithHitsJSONArray(values) %}
}
{% endfunc %}
{% func valuesWithHitsJSONArray(values []logstorage.ValueWithHits) %}
[
{% if len(values) > 0 %}
{%= valueWithHitsJSON(values[0]) %}
{% for _, v := range values[1:] %}
,{%= valueWithHitsJSON(v) %}
{% endfor %}
{% endif %}
]
{% endfunc %}
{% func valueWithHitsJSON(v logstorage.ValueWithHits) %}
{
"value":{%q= v.Value %},
"hits":{%dul= v.Hits %}
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,152 @@
// Code generated by qtc from "logsql.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vlselect/logsql/logsql.qtpl:1
package logsql
//line app/vlselect/logsql/logsql.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
// ValuesWithHitsJSON generates JSON from the given values.
//line app/vlselect/logsql/logsql.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/logsql.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/logsql.qtpl:8
func StreamValuesWithHitsJSON(qw422016 *qt422016.Writer, values []logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:8
qw422016.N().S(`{"values":`)
//line app/vlselect/logsql/logsql.qtpl:10
streamvaluesWithHitsJSONArray(qw422016, values)
//line app/vlselect/logsql/logsql.qtpl:10
qw422016.N().S(`}`)
//line app/vlselect/logsql/logsql.qtpl:12
}
//line app/vlselect/logsql/logsql.qtpl:12
func WriteValuesWithHitsJSON(qq422016 qtio422016.Writer, values []logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:12
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/logsql.qtpl:12
StreamValuesWithHitsJSON(qw422016, values)
//line app/vlselect/logsql/logsql.qtpl:12
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/logsql.qtpl:12
}
//line app/vlselect/logsql/logsql.qtpl:12
func ValuesWithHitsJSON(values []logstorage.ValueWithHits) string {
//line app/vlselect/logsql/logsql.qtpl:12
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/logsql.qtpl:12
WriteValuesWithHitsJSON(qb422016, values)
//line app/vlselect/logsql/logsql.qtpl:12
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/logsql.qtpl:12
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/logsql.qtpl:12
return qs422016
//line app/vlselect/logsql/logsql.qtpl:12
}
//line app/vlselect/logsql/logsql.qtpl:14
func streamvaluesWithHitsJSONArray(qw422016 *qt422016.Writer, values []logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:14
qw422016.N().S(`[`)
//line app/vlselect/logsql/logsql.qtpl:16
if len(values) > 0 {
//line app/vlselect/logsql/logsql.qtpl:17
streamvalueWithHitsJSON(qw422016, values[0])
//line app/vlselect/logsql/logsql.qtpl:18
for _, v := range values[1:] {
//line app/vlselect/logsql/logsql.qtpl:18
qw422016.N().S(`,`)
//line app/vlselect/logsql/logsql.qtpl:19
streamvalueWithHitsJSON(qw422016, v)
//line app/vlselect/logsql/logsql.qtpl:20
}
//line app/vlselect/logsql/logsql.qtpl:21
}
//line app/vlselect/logsql/logsql.qtpl:21
qw422016.N().S(`]`)
//line app/vlselect/logsql/logsql.qtpl:23
}
//line app/vlselect/logsql/logsql.qtpl:23
func writevaluesWithHitsJSONArray(qq422016 qtio422016.Writer, values []logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:23
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/logsql.qtpl:23
streamvaluesWithHitsJSONArray(qw422016, values)
//line app/vlselect/logsql/logsql.qtpl:23
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/logsql.qtpl:23
}
//line app/vlselect/logsql/logsql.qtpl:23
func valuesWithHitsJSONArray(values []logstorage.ValueWithHits) string {
//line app/vlselect/logsql/logsql.qtpl:23
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/logsql.qtpl:23
writevaluesWithHitsJSONArray(qb422016, values)
//line app/vlselect/logsql/logsql.qtpl:23
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/logsql.qtpl:23
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/logsql.qtpl:23
return qs422016
//line app/vlselect/logsql/logsql.qtpl:23
}
//line app/vlselect/logsql/logsql.qtpl:25
func streamvalueWithHitsJSON(qw422016 *qt422016.Writer, v logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:25
qw422016.N().S(`{"value":`)
//line app/vlselect/logsql/logsql.qtpl:27
qw422016.N().Q(v.Value)
//line app/vlselect/logsql/logsql.qtpl:27
qw422016.N().S(`,"hits":`)
//line app/vlselect/logsql/logsql.qtpl:28
qw422016.N().DUL(v.Hits)
//line app/vlselect/logsql/logsql.qtpl:28
qw422016.N().S(`}`)
//line app/vlselect/logsql/logsql.qtpl:30
}
//line app/vlselect/logsql/logsql.qtpl:30
func writevalueWithHitsJSON(qq422016 qtio422016.Writer, v logstorage.ValueWithHits) {
//line app/vlselect/logsql/logsql.qtpl:30
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/logsql.qtpl:30
streamvalueWithHitsJSON(qw422016, v)
//line app/vlselect/logsql/logsql.qtpl:30
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/logsql.qtpl:30
}
//line app/vlselect/logsql/logsql.qtpl:30
func valueWithHitsJSON(v logstorage.ValueWithHits) string {
//line app/vlselect/logsql/logsql.qtpl:30
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/logsql.qtpl:30
writevalueWithHitsJSON(qb422016, v)
//line app/vlselect/logsql/logsql.qtpl:30
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/logsql.qtpl:30
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/logsql.qtpl:30
return qs422016
//line app/vlselect/logsql/logsql.qtpl:30
}

View file

@ -1,17 +0,0 @@
{% stripspace %}
// StreamLabelNamesResponse formats /select/logsql/stream_label_names response
{% func StreamLabelNamesResponse(names []string) %}
{
"names":[
{% if len(names) > 0 %}
{%q= names[0] %}
{% for _, v := range names[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -1,69 +0,0 @@
// Code generated by qtc from "stream_label_names_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamLabelNamesResponse formats /select/logsql/stream_label_names response
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
package logsql
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
func StreamStreamLabelNamesResponse(qw422016 *qt422016.Writer, names []string) {
//line app/vlselect/logsql/stream_label_names_response.qtpl:4
qw422016.N().S(`{"names":[`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:7
if len(names) > 0 {
//line app/vlselect/logsql/stream_label_names_response.qtpl:8
qw422016.N().Q(names[0])
//line app/vlselect/logsql/stream_label_names_response.qtpl:9
for _, v := range names[1:] {
//line app/vlselect/logsql/stream_label_names_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/stream_label_names_response.qtpl:11
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:12
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
func WriteStreamLabelNamesResponse(qq422016 qtio422016.Writer, names []string) {
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
StreamStreamLabelNamesResponse(qw422016, names)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
func StreamLabelNamesResponse(names []string) string {
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
WriteStreamLabelNamesResponse(qb422016, names)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
return qs422016
//line app/vlselect/logsql/stream_label_names_response.qtpl:15
}

View file

@ -1,17 +0,0 @@
{% stripspace %}
// StreamLabelValuesResponse formats /select/logsql/stream_label_values response
{% func StreamLabelValuesResponse(values []string) %}
{
"values":[
{% if len(values) > 0 %}
{%q= values[0] %}
{% for _, v := range values[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -1,69 +0,0 @@
// Code generated by qtc from "stream_label_values_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamLabelValuesResponse formats /select/logsql/stream_label_values response
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
package logsql
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
func StreamStreamLabelValuesResponse(qw422016 *qt422016.Writer, values []string) {
//line app/vlselect/logsql/stream_label_values_response.qtpl:4
qw422016.N().S(`{"values":[`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:7
if len(values) > 0 {
//line app/vlselect/logsql/stream_label_values_response.qtpl:8
qw422016.N().Q(values[0])
//line app/vlselect/logsql/stream_label_values_response.qtpl:9
for _, v := range values[1:] {
//line app/vlselect/logsql/stream_label_values_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/stream_label_values_response.qtpl:11
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:12
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
func WriteStreamLabelValuesResponse(qq422016 qtio422016.Writer, values []string) {
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
StreamStreamLabelValuesResponse(qw422016, values)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
func StreamLabelValuesResponse(values []string) string {
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
WriteStreamLabelValuesResponse(qb422016, values)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
return qs422016
//line app/vlselect/logsql/stream_label_values_response.qtpl:15
}

View file

@ -1,17 +0,0 @@
{% stripspace %}
// StreamsResponse formats /select/logsql/streams response
{% func StreamsResponse(streams []string) %}
{
"streams":[
{% if len(streams) > 0 %}
{%q= streams[0] %}
{% for _, v := range streams[1:] %}
,{%q= v %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -1,69 +0,0 @@
// Code generated by qtc from "streams_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// StreamsResponse formats /select/logsql/streams response
//line app/vlselect/logsql/streams_response.qtpl:4
package logsql
//line app/vlselect/logsql/streams_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/streams_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/streams_response.qtpl:4
func StreamStreamsResponse(qw422016 *qt422016.Writer, streams []string) {
//line app/vlselect/logsql/streams_response.qtpl:4
qw422016.N().S(`{"streams":[`)
//line app/vlselect/logsql/streams_response.qtpl:7
if len(streams) > 0 {
//line app/vlselect/logsql/streams_response.qtpl:8
qw422016.N().Q(streams[0])
//line app/vlselect/logsql/streams_response.qtpl:9
for _, v := range streams[1:] {
//line app/vlselect/logsql/streams_response.qtpl:9
qw422016.N().S(`,`)
//line app/vlselect/logsql/streams_response.qtpl:10
qw422016.N().Q(v)
//line app/vlselect/logsql/streams_response.qtpl:11
}
//line app/vlselect/logsql/streams_response.qtpl:12
}
//line app/vlselect/logsql/streams_response.qtpl:12
qw422016.N().S(`]}`)
//line app/vlselect/logsql/streams_response.qtpl:15
}
//line app/vlselect/logsql/streams_response.qtpl:15
func WriteStreamsResponse(qq422016 qtio422016.Writer, streams []string) {
//line app/vlselect/logsql/streams_response.qtpl:15
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/streams_response.qtpl:15
StreamStreamsResponse(qw422016, streams)
//line app/vlselect/logsql/streams_response.qtpl:15
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/streams_response.qtpl:15
}
//line app/vlselect/logsql/streams_response.qtpl:15
func StreamsResponse(streams []string) string {
//line app/vlselect/logsql/streams_response.qtpl:15
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/streams_response.qtpl:15
WriteStreamsResponse(qb422016, streams)
//line app/vlselect/logsql/streams_response.qtpl:15
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/streams_response.qtpl:15
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/streams_response.qtpl:15
return qs422016
//line app/vlselect/logsql/streams_response.qtpl:15
}

View file

@ -112,33 +112,33 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag
}
// GetFieldNames executes q and returns field names seen in results.
func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) {
func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]logstorage.ValueWithHits, error) {
return strg.GetFieldNames(ctx, tenantIDs, q)
}
// GetFieldValues executes q and returns unique values for the fieldName seen in results.
//
// If limit > 0, then up to limit unique values are returned.
func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) {
func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]logstorage.ValueWithHits, error) {
return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit)
}
// GetStreamLabelNames executes q and returns stream labels names seen in results.
func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) {
func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]logstorage.ValueWithHits, error) {
return strg.GetStreamLabelNames(ctx, tenantIDs, q)
}
// GetStreamLabelValues executes q and returns stream label values for the given labelName seen in results.
//
// If limit > 0, then up to limit unique stream label values are returned.
func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]string, error) {
func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]logstorage.ValueWithHits, error) {
return strg.GetStreamLabelValues(ctx, tenantIDs, q, labelName, limit)
}
// GetStreams executes q and returns streams seen in query results.
//
// If limit > 0, then up to limit unique streams are returned.
func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]string, error) {
func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]logstorage.ValueWithHits, error) {
return strg.GetStreams(ctx, tenantIDs, q, limit)
}

View file

@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
## tip
* FAETURE: return the number of matching log entries per returned value in [HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) results. This simplifies detecting [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) / [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) values with the biggest number of logs for the given [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/).
* FEATURE: improve performance for [regexp filter](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) in the following cases:
- If the regexp contains just a phrase without special regular expression chars. For example, `~"foo"`.
- If the regexp starts with `.*` or ends with `.*`. For example, `~".*foo.*"`.
@ -27,6 +28,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
* FEATURE: allow disabling automatic unquoting of the matched placeholders in [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#format-for-extract-pipe-pattern).
* BUGFIX: properly parse `!` in front of [exact filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter), [exact-prefix filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-prefix-filter) and [regexp filter](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter). For example, `!~"some regexp"` is properly parsed as `not ="some regexp"`. Previously it was incorrectly parsed as `'~="some regexp"'` [phrase filter](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter).
* BUGFIX: properly sort results by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) when [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) is applied. For example, `_time:5m | sort by (_time) desc | limit 10` properly works now.
## [v0.9.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.9.1-victorialogs)

View file

@ -1246,12 +1246,12 @@ _time:5m | extract if (ip:"") "ip=<ip> "
### field_names pipe
Sometimes it may be needed to get all the field names for the selected results. This may be done with `| field_names ...` [pipe](#pipes).
For example, the following query returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
from the logs over the last 5 minutes:
`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
with an estimated number of logs per each field name.
For example, the following query returns all the field names with the number of matching logs over the last 5 minutes:
```logsql
_time:5m | field_names as names
_time:5m | field_names
```
Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in order to sort them if needed.
@ -1622,7 +1622,7 @@ _time:5m | stats
### uniq pipe
`| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query
`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query
returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
over logs for the last 5 minutes:
@ -1639,6 +1639,12 @@ _time:5m | uniq by (host, path)
The unique entries are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in order to sort them if needed.
Add `hits` after `uniq by (...)` in order to return the number of matching logs per each field value:
```logsql
_time:5m | uniq by (host) hits
```
Unique entries are stored in memory during query execution. Big number of unique selected entries may require a lot of memory.
Sometimes it is enough to return up to `N` unique entries. This can be done by adding `limit N` after `by (...)` clause.
This allows limiting memory usage. For example, the following query returns up to 100 unique `(host, path)` pairs for the logs over the last 5 minutes:
@ -1647,6 +1653,8 @@ This allows limiting memory usage. For example, the following query returns up t
_time:5m | uniq by (host, path) limit 100
```
If the `limit` is reached, then arbitrary subset of unique values can be returned. The `hits` calculation doesn't work when the `limit` is reached.
The `by` keyword can be skipped in `uniq ...` pipe. For example, the following query is equivalent to the previous one:
```logsql

View file

@ -211,6 +211,7 @@ See also:
VictoriaLogs provides `/select/logsql/streams?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every `stream`.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -227,11 +228,19 @@ Below is an example JSON output returned from this endpoint:
```json
{
"streams": [
"{host=\"1.2.3.4\",app=\"foo\"}",
"{host=\"1.2.3.4\",app=\"bar\"}",
"{host=\"10.2.3.4\",app=\"foo\"}",
"{host=\"10.2.3.5\",app=\"baz\"}"
"values": [
{
"value": "{host=\"host-123\",app=\"foo\"}",
"hits": 34980
},
{
"value": "{host=\"host-124\",app=\"bar\"}",
"hits": 32892
},
{
"value": "{host=\"host-125\",app=\"baz\"}",
"hits": 32877
}
]
}
```
@ -250,6 +259,7 @@ See also:
VictoriaLogs provides `/select/logsql/stream_label_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns
[log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results
of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every label name.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -266,12 +276,19 @@ Below is an example JSON output returned from this endpoint:
```json
{
"names": [
"app",
"container",
"datacenter",
"host",
"namespace"
"values": [
{
"value": "app",
"hits": 1033300623
},
{
"value": "container",
"hits": 1033300623
},
{
"value": "datacenter",
"hits": 1033300623
}
]
}
```
@ -288,6 +305,7 @@ See also:
VictoriaLogs provides `/select/logsql/stream_label_values?query=<query>&start=<start>&<end>&label=<labelName>` HTTP endpoint,
which returns [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the label with the given `<labelName>` name
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every label value.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -305,10 +323,14 @@ Below is an example JSON output returned from this endpoint:
```json
{
"values": [
"host-0",
"host-1",
"host-2",
"host-3"
{
"value": "host-1",
"hits": 69426656
},
{
"value": "host-2",
"hits": 66507749
}
]
}
```
@ -327,6 +349,7 @@ See also:
VictoriaLogs provides `/select/logsql/field_names?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns field names
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every field name.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -343,13 +366,19 @@ Below is an example JSON output returned from this endpoint:
```json
{
"names": [
"_msg",
"_stream",
"_time",
"host",
"level",
"location"
"values": [
{
"value": "_msg",
"hits": 1033300623
},
{
"value": "_stream",
"hits": 1033300623
},
{
"value": "_time",
"hits": 1033300623
}
]
}
```
@ -366,6 +395,7 @@ See also:
VictoriaLogs provides `/select/logsql/field_values?query=<query>&field=<fieldName>&start=<start>&end=<end>` HTTP endpoint, which returns
unique values for the given `<fieldName>` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
from results of the given `<query>` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every field value.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -383,17 +413,25 @@ Below is an example JSON output returned from this endpoint:
```json
{
"values": [
"host_0",
"host_1",
"host_10",
"host_100",
"host_1000"
{
"value": "host-1",
"hits": 69426656
},
{
"value": "host-2",
"hits": 66507749
},
{
"value": "host-3",
"hits": 65454351
}
]
}
```
The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values.
When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably.
See also:

View file

@ -1804,6 +1804,8 @@ func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn
}
// addValue adds the given values v to rc.
//
// rc is valid until v is modified.
func (rc *resultColumn) addValue(v string) {
rc.values = append(rc.values, v)
}

View file

@ -1276,7 +1276,6 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | fields bar,,`)
// invalid field_names
f(`foo | field_names`)
f(`foo | field_names |`)
f(`foo | field_names (`)
f(`foo | field_names )`)

View file

@ -10,7 +10,8 @@ import (
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe
type pipeFieldNames struct {
// resultName is the name of the column to write results to.
// resultName is an optional name of the column to write results to.
// By default results are written into 'name' column.
resultName string
// isFirstPipe is set to true if '| field_names' pipe is the first in the query.
@ -20,7 +21,11 @@ type pipeFieldNames struct {
}
func (pf *pipeFieldNames) String() string {
return "field_names as " + quoteTokenIfNeeded(pf.resultName)
s := "field_names"
if pf.resultName != "name" {
s += " as " + quoteTokenIfNeeded(pf.resultName)
}
return s
}
func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) {
@ -34,13 +39,6 @@ func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fields
func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeFieldNamesProcessorShard, workersCount)
for i := range shards {
shards[i] = pipeFieldNamesProcessorShard{
pipeFieldNamesProcessorShardNopad: pipeFieldNamesProcessorShardNopad{
m: make(map[string]struct{}),
},
}
}
pfp := &pipeFieldNamesProcessor{
pf: pf,
@ -68,8 +66,15 @@ type pipeFieldNamesProcessorShard struct {
}
type pipeFieldNamesProcessorShardNopad struct {
// m holds unique field names.
m map[string]struct{}
// m holds hits per each field name
m map[string]*uint64
}
func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
}
return shard.m
}
func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) {
@ -78,12 +83,21 @@ func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) {
}
shard := &pfp.shards[workerID]
m := shard.getM()
cs := br.getColumns()
for _, c := range cs {
if _, ok := shard.m[c.name]; !ok {
pHits, ok := m[c.name]
if !ok {
nameCopy := strings.Clone(c.name)
shard.m[nameCopy] = struct{}{}
hits := uint64(0)
pHits = &hits
m[nameCopy] = pHits
}
// Assume that the column is set for all the rows in the block.
// This is much faster than reading all the column values and counting non-empty rows.
*pHits += uint64(len(br.timestamps))
}
}
@ -94,15 +108,25 @@ func (pfp *pipeFieldNamesProcessor) flush() error {
// merge state across shards
shards := pfp.shards
m := shards[0].m
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
for k := range shards[i].m {
m[k] = struct{}{}
for name, pHitsSrc := range shards[i].getM() {
pHits, ok := m[name]
if !ok {
m[name] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
}
}
if pfp.pf.isFirstPipe {
m["_time"] = struct{}{}
pHits := m["_stream"]
if pHits == nil {
hits := uint64(0)
pHits = &hits
}
m["_time"] = pHits
}
// write result
@ -110,8 +134,11 @@ func (pfp *pipeFieldNamesProcessor) flush() error {
pfp: pfp,
}
wctx.rcs[0].name = pfp.pf.resultName
for k := range m {
wctx.writeRow(k)
wctx.rcs[1].name = "hits"
for name, pHits := range m {
hits := string(marshalUint64String(nil, *pHits))
wctx.writeRow(name, hits)
}
wctx.flush()
@ -120,7 +147,7 @@ func (pfp *pipeFieldNamesProcessor) flush() error {
type pipeFieldNamesWriteContext struct {
pfp *pipeFieldNamesProcessor
rcs [1]resultColumn
rcs [2]resultColumn
br blockResult
// rowsCount is the number of rows in the current block
@ -130,9 +157,10 @@ type pipeFieldNamesWriteContext struct {
valuesLen int
}
func (wctx *pipeFieldNamesWriteContext) writeRow(v string) {
wctx.rcs[0].addValue(v)
wctx.valuesLen += len(v)
func (wctx *pipeFieldNamesWriteContext) writeRow(name, hits string) {
wctx.rcs[0].addValue(name)
wctx.rcs[1].addValue(hits)
wctx.valuesLen += len(name) + len(hits)
wctx.rowsCount++
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
@ -145,11 +173,12 @@ func (wctx *pipeFieldNamesWriteContext) flush() {
wctx.valuesLen = 0
// Flush rcs to ppBase
br.setResultColumns(wctx.rcs[:1], wctx.rowsCount)
br.setResultColumns(wctx.rcs[:], wctx.rowsCount)
wctx.rowsCount = 0
wctx.pfp.ppBase.writeBlock(0, br)
br.reset()
wctx.rcs[0].resetValues()
wctx.rcs[1].resetValues()
}
func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) {
@ -158,13 +187,21 @@ func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) {
}
lex.nextToken()
resultName := "name"
if lex.isKeyword("as") {
lex.nextToken()
}
resultName, err := parseFieldName(lex)
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
}
pf := &pipeFieldNames{
resultName: resultName,

View file

@ -10,6 +10,7 @@ func TestParsePipeFieldNamesSuccess(t *testing.T) {
expectParsePipeSuccess(t, pipeStr)
}
f(`field_names`)
f(`field_names as x`)
}
@ -19,7 +20,6 @@ func TestParsePipeFieldNamesFailure(t *testing.T) {
expectParsePipeFailure(t, pipeStr)
}
f(`field_names`)
f(`field_names(foo)`)
f(`field_names a b`)
f(`field_names as`)
@ -32,32 +32,47 @@ func TestPipeFieldNames(t *testing.T) {
}
// single row, result column doesn't clash with original columns
f("field_names as x", [][]Field{
f("field_names", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
},
}, [][]Field{
{
{"x", "_msg"},
{"name", "_msg"},
{"hits", "1"},
},
{
{"x", "a"},
{"name", "a"},
{"hits", "1"},
},
})
// single row, result column do clashes with original columns
f("field_names as _msg", [][]Field{
f("field_names as x", [][]Field{
{
{"_msg", `{"foo":"bar"}`},
{"a", `test`},
{"b", "aaa"},
},
{
{"a", `bar`},
},
{
{"a", `bar`},
{"c", `bar`},
},
}, [][]Field{
{
{"_msg", "_msg"},
{"x", "a"},
{"hits", "3"},
},
{
{"_msg", "a"},
{"x", "b"},
{"hits", "1"},
},
{
{"x", "c"},
{"hits", "1"},
},
})
}

View file

@ -477,14 +477,12 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
wctx.rcs = rcs
}
var tmpBuf []byte
byColumns := r.byColumns
byColumnsIsTime := r.byColumnsIsTime
for i := range byFields {
v := byColumns[i]
if byColumnsIsTime[i] {
tmpBuf = marshalTimestampRFC3339NanoString(tmpBuf[:0], r.timestamp)
v = bytesutil.ToUnsafeString(tmpBuf)
v = string(marshalTimestampRFC3339NanoString(nil, r.timestamp))
}
rcs[i].addValue(v)
wctx.valuesLen += len(v)

View file

@ -20,6 +20,9 @@ type pipeUniq struct {
// fields contains field names for returning unique values
byFields []string
// if hitsFieldName isn't empty, then the number of hits per each unique value is stored in this field.
hitsFieldName string
limit uint64
}
@ -28,6 +31,9 @@ func (pu *pipeUniq) String() string {
if len(pu.byFields) > 0 {
s += " by (" + fieldNamesString(pu.byFields) + ")"
}
if pu.hitsFieldName != "" {
s += " hits"
}
if pu.limit > 0 {
s += fmt.Sprintf(" limit %d", pu.limit)
}
@ -53,7 +59,6 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c
shards[i] = pipeUniqProcessorShard{
pipeUniqProcessorShardNopad: pipeUniqProcessorShardNopad{
pu: pu,
m: make(map[string]struct{}),
stateSizeBudget: stateSizeBudgetChunk,
},
}
@ -98,8 +103,8 @@ type pipeUniqProcessorShardNopad struct {
// pu points to the parent pipeUniq.
pu *pipeUniq
// m holds unique rows.
m map[string]struct{}
// m holds per-row hits.
m map[string]*uint64
// keyBuf is a temporary buffer for building keys for m.
keyBuf []byte
@ -120,6 +125,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
return false
}
needHits := shard.pu.hitsFieldName != ""
byFields := shard.pu.byFields
if len(byFields) == 0 {
// Take into account all the columns in br.
@ -132,7 +138,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name))
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf))
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
}
shard.keyBuf = keyBuf
return true
@ -142,20 +148,34 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
c := br.getColumnByName(byFields[0])
if c.isConst {
v := c.valuesEncoded[0]
shard.updateState(v)
shard.updateState(v, uint64(len(br.timestamps)))
return true
}
if c.valueType == valueTypeDict {
if needHits {
a := encoding.GetUint64s(len(c.dictValues))
hits := a.A
valuesEncoded := c.getValuesEncoded(br)
for _, v := range valuesEncoded {
idx := unmarshalUint8(v)
hits[idx]++
}
for i, v := range c.dictValues {
shard.updateState(v, hits[i])
}
encoding.PutUint64s(a)
} else {
for _, v := range c.dictValues {
shard.updateState(v)
shard.updateState(v, 0)
}
}
return true
}
values := c.getValues(br)
for i, v := range values {
if i == 0 || values[i-1] != values[i] {
shard.updateState(v)
if needHits || i == 0 || values[i-1] != values[i] {
shard.updateState(v, 1)
}
}
return true
@ -174,7 +194,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
for i := range br.timestamps {
seenValue := true
for _, values := range columnValues {
if i == 0 || values[i-1] != values[i] {
if needHits || i == 0 || values[i-1] != values[i] {
seenValue = false
break
}
@ -187,19 +207,31 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool {
for _, values := range columnValues {
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i]))
}
shard.updateState(bytesutil.ToUnsafeString(keyBuf))
shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1)
}
shard.keyBuf = keyBuf
return true
}
func (shard *pipeUniqProcessorShard) updateState(v string) {
if _, ok := shard.m[v]; !ok {
func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) {
m := shard.getM()
pHits, ok := m[v]
if !ok {
vCopy := strings.Clone(v)
shard.m[vCopy] = struct{}{}
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy))
hits := uint64(0)
pHits = &hits
m[vCopy] = pHits
shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits))
}
*pHits += hits
}
func (shard *pipeUniqProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
}
return shard.m
}
func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) {
@ -235,17 +267,26 @@ func (pup *pipeUniqProcessor) flush() error {
// merge state across shards
shards := pup.shards
m := shards[0].m
m := shards[0].getM()
shards = shards[1:]
for i := range shards {
if needStop(pup.stopCh) {
return nil
}
for k := range shards[i].m {
m[k] = struct{}{}
for k, pHitsSrc := range shards[i].getM() {
pHits, ok := m[k]
if !ok {
m[k] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
}
}
// There is little sense in returning partial hits when the limit on the number of unique entries is reached.
// It is better from UX experience is to return zero hits instead.
resetHits := pup.pu.limit > 0 && uint64(len(m)) >= pup.pu.limit
// write result
wctx := &pipeUniqWriteContext{
@ -254,8 +295,23 @@ func (pup *pipeUniqProcessor) flush() error {
byFields := pup.pu.byFields
var rowFields []Field
addHitsFieldIfNeeded := func(dst []Field, hits uint64) []Field {
if pup.pu.hitsFieldName == "" {
return dst
}
if resetHits {
hits = 0
}
hitsStr := string(marshalUint64String(nil, hits))
dst = append(dst, Field{
Name: pup.pu.hitsFieldName,
Value: hitsStr,
})
return dst
}
if len(byFields) == 0 {
for k := range m {
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
}
@ -280,11 +336,12 @@ func (pup *pipeUniqProcessor) flush() error {
Value: bytesutil.ToUnsafeString(value),
})
}
rowFields = addHitsFieldIfNeeded(rowFields, *pHits)
wctx.writeRow(rowFields)
}
} else if len(byFields) == 1 {
fieldName := byFields[0]
for k := range m {
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
}
@ -293,10 +350,11 @@ func (pup *pipeUniqProcessor) flush() error {
Name: fieldName,
Value: k,
})
rowFields = addHitsFieldIfNeeded(rowFields, *pHits)
wctx.writeRow(rowFields)
}
} else {
for k := range m {
for k, pHits := range m {
if needStop(pup.stopCh) {
return nil
}
@ -317,6 +375,7 @@ func (pup *pipeUniqProcessor) flush() error {
})
fieldIdx++
}
rowFields = addHitsFieldIfNeeded(rowFields, *pHits)
wctx.writeRow(rowFields)
}
}
@ -418,6 +477,16 @@ func parsePipeUniq(lex *lexer) (*pipeUniq, error) {
pu.byFields = bfs
}
if lex.isKeyword("hits") {
lex.nextToken()
hitsFieldName := "hits"
for slices.Contains(pu.byFields, hitsFieldName) {
hitsFieldName += "s"
}
pu.hitsFieldName = hitsFieldName
}
if lex.isKeyword("limit") {
lex.nextToken()
n, ok := tryParseUint64(lex.token)

View file

@ -11,11 +11,15 @@ func TestParsePipeUniqSuccess(t *testing.T) {
}
f(`uniq`)
f(`uniq hits`)
f(`uniq limit 10`)
f(`uniq hits limit 10`)
f(`uniq by (x)`)
f(`uniq by (x) limit 10`)
f(`uniq by (x, y)`)
f(`uniq by (x, y) hits`)
f(`uniq by (x, y) limit 10`)
f(`uniq by (x, y) hits limit 10`)
}
func TestParsePipeUniqFailure(t *testing.T) {
@ -26,6 +30,7 @@ func TestParsePipeUniqFailure(t *testing.T) {
f(`uniq foo`)
f(`uniq by`)
f(`uniq by hits`)
f(`uniq by(x) limit`)
f(`uniq by(x) limit foo`)
}
@ -62,6 +67,62 @@ func TestPipeUniq(t *testing.T) {
},
})
f("uniq hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
{"hits", "1"},
},
})
f("uniq hits limit 2", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "0"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
{"hits", "0"},
},
})
f("uniq by (a)", [][]Field{
{
{"a", `2`},
@ -82,6 +143,27 @@ func TestPipeUniq(t *testing.T) {
},
})
f("uniq by (a) hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"hits", "3"},
},
})
f("uniq by (b)", [][]Field{
{
{"a", `2`},
@ -105,6 +187,31 @@ func TestPipeUniq(t *testing.T) {
},
})
f("uniq by (b) hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"b", "3"},
{"hits", "2"},
},
{
{"b", "54"},
{"hits", "1"},
},
})
f("uniq by (c)", [][]Field{
{
{"a", `2`},
@ -128,6 +235,31 @@ func TestPipeUniq(t *testing.T) {
},
})
f("uniq by (c) hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"c", ""},
{"hits", "2"},
},
{
{"c", "d"},
{"hits", "1"},
},
})
f("uniq by (d)", [][]Field{
{
{"a", `2`},
@ -148,6 +280,27 @@ func TestPipeUniq(t *testing.T) {
},
})
f("uniq by (d) hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"d", ""},
{"hits", "3"},
},
})
f("uniq by (a, b)", [][]Field{
{
{"a", `2`},
@ -172,6 +325,33 @@ func TestPipeUniq(t *testing.T) {
{"b", "54"},
},
})
f("uniq by (a, b) hits", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", "2"},
{"b", "3"},
{"hits", "2"},
},
{
{"a", "2"},
{"b", "54"},
{"hits", "1"},
},
})
}
func TestPipeUniqUpdateNeededFields(t *testing.T) {

View file

@ -145,9 +145,9 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
}
// GetFieldNames returns field names from q results for the given tenantIDs.
func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
pipeStr := "field_names as names | sort by (names)"
pipeStr := "field_names"
lex := newLexer(pipeStr)
pf, err := parsePipeFieldNames(lex)
@ -156,36 +156,24 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu
}
pf.isFirstPipe = len(pipes) == 0
if !lex.isKeyword("|") {
logger.Panicf("BUG: unexpected token after 'field_names' pipe at [%s]: %q", pipeStr, lex.token)
}
lex.nextToken()
ps, err := parsePipeSort(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pf, ps)
pipes = append(pipes, pf)
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runSingleColumnQuery(ctx, tenantIDs, q)
return s.runValuesWithHitsQuery(ctx, tenantIDs, q)
}
// GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs.
//
// If limit > 0, then up to limit unique values are returned.
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) {
func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string) ([]string, error) {
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("uniq by (%s) limit %d | sort by (%s)", quotedFieldName, limit, quotedFieldName)
pipeStr := fmt.Sprintf("uniq by (%s)", quotedFieldName)
lex := newLexer(pipeStr)
pu, err := parsePipeUniq(lex)
@ -193,87 +181,17 @@ func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Q
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isKeyword("|") {
logger.Panicf("BUG: unexpected token after 'uniq' pipe at [%s]: %q", pipeStr, lex.token)
}
lex.nextToken()
ps, err := parsePipeSort(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pu, ps)
pipes = append(pipes, pu)
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runSingleColumnQuery(ctx, tenantIDs, q)
}
// GetStreamLabelNames returns stream label names from q results for the given tenantIDs.
func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
var names []string
m := make(map[string]struct{})
forEachStreamLabel(streams, func(label Field) {
if _, ok := m[label.Name]; !ok {
nameCopy := strings.Clone(label.Name)
names = append(names, nameCopy)
m[nameCopy] = struct{}{}
}
})
sortStrings(names)
return names, nil
}
// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs.
//
// If limit > 9, then up to limit unique label values are returned.
func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]string, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
var values []string
m := make(map[string]struct{})
forEachStreamLabel(streams, func(label Field) {
if label.Name != labelName {
return
}
if _, ok := m[label.Value]; !ok {
valueCopy := strings.Clone(label.Value)
values = append(values, valueCopy)
m[valueCopy] = struct{}{}
}
})
if uint64(len(values)) > limit {
values = values[:limit]
}
sortStrings(values)
return values, nil
}
// GetStreams returns streams from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]string, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit)
}
func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
var values []string
var valuesLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) {
@ -283,13 +201,14 @@ func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID
cs := br.getColumns()
if len(cs) != 1 {
logger.Panicf("BUG: expecting only a single column; got %d columns", len(cs))
logger.Panicf("BUG: expecting one column; got %d columns", len(cs))
}
columnValues := cs[0].getValues(br)
columnValuesCopy := make([]string, len(columnValues))
for i, v := range columnValues {
columnValuesCopy[i] = strings.Clone(v)
for i := range columnValues {
columnValuesCopy[i] = strings.Clone(columnValues[i])
}
valuesLock.Lock()
@ -297,21 +216,182 @@ func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID
valuesLock.Unlock()
}
err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
if err != nil {
if err := s.runQuery(ctx, tenantIDs, q, writeBlockResult); err != nil {
return nil, err
}
return values, nil
}
// GetFieldValues returns unique values with the number of hits for the given fieldName returned by q for the given tenantIDs.
//
// If limit > 0, then up to limit unique values are returned.
func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
pipes := append([]pipe{}, q.pipes...)
quotedFieldName := quoteTokenIfNeeded(fieldName)
pipeStr := fmt.Sprintf("uniq by (%s) hits limit %d", quotedFieldName, limit)
lex := newLexer(pipeStr)
pu, err := parsePipeUniq(lex)
if err != nil {
logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
}
if !lex.isEnd() {
logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s)
}
pipes = append(pipes, pu)
q = &Query{
f: q.f,
pipes: pipes,
}
return s.runValuesWithHitsQuery(ctx, tenantIDs, q)
}
// ValueWithHits contains value and hits.
type ValueWithHits struct {
Value string
Hits uint64
}
func toValuesWithHits(m map[string]*uint64) []ValueWithHits {
results := make([]ValueWithHits, 0, len(m))
for k, pHits := range m {
results = append(results, ValueWithHits{
Value: k,
Hits: *pHits,
})
}
sortValuesWithHits(results)
return results
}
func sortValuesWithHits(results []ValueWithHits) {
slices.SortFunc(results, func(a, b ValueWithHits) int {
if a.Hits == b.Hits {
if a.Value == b.Value {
return 0
}
if lessString(a.Value, b.Value) {
return -1
}
return 1
}
// Sort in descending order of hits
if a.Hits < b.Hits {
return 1
}
return -1
})
}
// GetStreamLabelNames returns stream label names from q results for the given tenantIDs.
func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
m := make(map[string]*uint64)
forEachStreamLabel(streams, func(label Field, hits uint64) {
pHits, ok := m[label.Name]
if !ok {
nameCopy := strings.Clone(label.Name)
hitsLocal := uint64(0)
pHits = &hitsLocal
m[nameCopy] = pHits
}
*pHits += hits
})
names := toValuesWithHits(m)
return names, nil
}
// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs.
//
// If limit > 9, then up to limit unique label values are returned.
func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]ValueWithHits, error) {
streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64)
if err != nil {
return nil, err
}
m := make(map[string]*uint64)
forEachStreamLabel(streams, func(label Field, hits uint64) {
if label.Name != labelName {
return
}
pHits, ok := m[label.Value]
if !ok {
valueCopy := strings.Clone(label.Value)
hitsLocal := uint64(0)
pHits = &hitsLocal
m[valueCopy] = pHits
}
*pHits += hits
})
values := toValuesWithHits(m)
if limit > 0 && uint64(len(values)) > limit {
values = values[:limit]
}
return values, nil
}
// GetStreams returns streams from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]ValueWithHits, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit)
}
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex
writeBlockResult := func(_ uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
cs := br.getColumns()
if len(cs) != 2 {
logger.Panicf("BUG: expecting two columns; got %d columns", len(cs))
}
columnValues := cs[0].getValues(br)
columnHits := cs[1].getValues(br)
valuesWithHits := make([]ValueWithHits, len(columnValues))
for i := range columnValues {
x := &valuesWithHits[i]
hits, _ := tryParseUint64(columnHits[i])
x.Value = strings.Clone(columnValues[i])
x.Hits = hits
}
resultsLock.Lock()
results = append(results, valuesWithHits...)
resultsLock.Unlock()
}
err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
if err != nil {
return nil, err
}
sortValuesWithHits(results)
return results, nil
}
func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) {
if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) {
return q, nil
}
getFieldValues := func(q *Query, fieldName string) ([]string, error) {
return s.GetFieldValues(ctx, tenantIDs, q, fieldName, 0)
return s.getFieldValuesNoHits(ctx, tenantIDs, q, fieldName)
}
cache := make(map[string][]string)
fNew, err := initFilterInValuesForFilter(cache, q.f, getFieldValues)
@ -1007,16 +1087,17 @@ func getFilterTimeRange(f filter) (int64, int64) {
return math.MinInt64, math.MaxInt64
}
func forEachStreamLabel(streams []string, f func(label Field)) {
func forEachStreamLabel(streams []ValueWithHits, f func(label Field, hits uint64)) {
var labels []Field
for _, stream := range streams {
for i := range streams {
var err error
labels, err = parseStreamLabels(labels[:0], stream)
labels, err = parseStreamLabels(labels[:0], streams[i].Value)
if err != nil {
continue
}
for i := range labels {
f(labels[i])
hits := streams[i].Hits
for j := range labels {
f(labels[j], hits)
}
}
}