From 869b2fabc4d52f676f944d30509e36e46929c9ce Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 24 May 2024 03:03:12 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/field_names_response.qtpl | 17 -- .../logsql/field_names_response.qtpl.go | 69 ----- .../logsql/field_values_response.qtpl | 17 -- .../logsql/field_values_response.qtpl.go | 69 ----- app/vlselect/logsql/logsql.go | 10 +- app/vlselect/logsql/logsql.qtpl | 32 ++ app/vlselect/logsql/logsql.qtpl.go | 152 ++++++++++ .../logsql/stream_label_names_response.qtpl | 17 -- .../stream_label_names_response.qtpl.go | 69 ----- .../logsql/stream_label_values_response.qtpl | 17 -- .../stream_label_values_response.qtpl.go | 69 ----- app/vlselect/logsql/streams_response.qtpl | 17 -- app/vlselect/logsql/streams_response.qtpl.go | 69 ----- app/vlstorage/main.go | 10 +- docs/VictoriaLogs/CHANGELOG.md | 2 + docs/VictoriaLogs/LogsQL.md | 18 +- docs/VictoriaLogs/querying/README.md | 92 ++++-- lib/logstorage/block_result.go | 2 + lib/logstorage/parser_test.go | 1 - lib/logstorage/pipe_field_names.go | 93 ++++-- lib/logstorage/pipe_field_names_test.go | 31 +- lib/logstorage/pipe_topk.go | 4 +- lib/logstorage/pipe_uniq.go | 111 +++++-- lib/logstorage/pipe_uniq_test.go | 180 +++++++++++ lib/logstorage/storage_search.go | 281 +++++++++++------- 25 files changed, 816 insertions(+), 633 deletions(-) delete mode 100644 app/vlselect/logsql/field_names_response.qtpl delete mode 100644 app/vlselect/logsql/field_names_response.qtpl.go delete mode 100644 app/vlselect/logsql/field_values_response.qtpl delete mode 100644 app/vlselect/logsql/field_values_response.qtpl.go create mode 100644 app/vlselect/logsql/logsql.qtpl create mode 100644 app/vlselect/logsql/logsql.qtpl.go delete mode 100644 app/vlselect/logsql/stream_label_names_response.qtpl delete mode 100644 app/vlselect/logsql/stream_label_names_response.qtpl.go delete mode 100644 app/vlselect/logsql/stream_label_values_response.qtpl delete mode 100644 app/vlselect/logsql/stream_label_values_response.qtpl.go delete mode 100644 app/vlselect/logsql/streams_response.qtpl delete mode 100644 app/vlselect/logsql/streams_response.qtpl.go diff --git a/app/vlselect/logsql/field_names_response.qtpl b/app/vlselect/logsql/field_names_response.qtpl deleted file mode 100644 index cbb276e0a..000000000 --- a/app/vlselect/logsql/field_names_response.qtpl +++ /dev/null @@ -1,17 +0,0 @@ -{% stripspace %} - -// FieldNamesResponse formats /select/logsql/field_names response -{% func FieldNamesResponse(names []string) %} -{ - "names":[ - {% if len(names) > 0 %} - {%q= names[0] %} - {% for _, v := range names[1:] %} - ,{%q= v %} - {% endfor %} - {% endif %} - ] -} -{% endfunc %} - -{% endstripspace %} diff --git a/app/vlselect/logsql/field_names_response.qtpl.go b/app/vlselect/logsql/field_names_response.qtpl.go deleted file mode 100644 index 97d1bf011..000000000 --- a/app/vlselect/logsql/field_names_response.qtpl.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by qtc from "field_names_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// FieldNamesResponse formats /select/logsql/field_names response - -//line app/vlselect/logsql/field_names_response.qtpl:4 -package logsql - -//line app/vlselect/logsql/field_names_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vlselect/logsql/field_names_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vlselect/logsql/field_names_response.qtpl:4 -func StreamFieldNamesResponse(qw422016 *qt422016.Writer, names []string) { -//line app/vlselect/logsql/field_names_response.qtpl:4 - qw422016.N().S(`{"names":[`) -//line app/vlselect/logsql/field_names_response.qtpl:7 - if len(names) > 0 { -//line app/vlselect/logsql/field_names_response.qtpl:8 - qw422016.N().Q(names[0]) -//line app/vlselect/logsql/field_names_response.qtpl:9 - for _, v := range names[1:] { -//line app/vlselect/logsql/field_names_response.qtpl:9 - qw422016.N().S(`,`) -//line app/vlselect/logsql/field_names_response.qtpl:10 - qw422016.N().Q(v) -//line app/vlselect/logsql/field_names_response.qtpl:11 - } -//line app/vlselect/logsql/field_names_response.qtpl:12 - } -//line app/vlselect/logsql/field_names_response.qtpl:12 - qw422016.N().S(`]}`) -//line app/vlselect/logsql/field_names_response.qtpl:15 -} - -//line app/vlselect/logsql/field_names_response.qtpl:15 -func WriteFieldNamesResponse(qq422016 qtio422016.Writer, names []string) { -//line app/vlselect/logsql/field_names_response.qtpl:15 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vlselect/logsql/field_names_response.qtpl:15 - StreamFieldNamesResponse(qw422016, names) -//line app/vlselect/logsql/field_names_response.qtpl:15 - qt422016.ReleaseWriter(qw422016) -//line app/vlselect/logsql/field_names_response.qtpl:15 -} - -//line app/vlselect/logsql/field_names_response.qtpl:15 -func FieldNamesResponse(names []string) string { -//line app/vlselect/logsql/field_names_response.qtpl:15 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vlselect/logsql/field_names_response.qtpl:15 - WriteFieldNamesResponse(qb422016, names) -//line app/vlselect/logsql/field_names_response.qtpl:15 - qs422016 := string(qb422016.B) -//line app/vlselect/logsql/field_names_response.qtpl:15 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vlselect/logsql/field_names_response.qtpl:15 - return qs422016 -//line app/vlselect/logsql/field_names_response.qtpl:15 -} diff --git a/app/vlselect/logsql/field_values_response.qtpl b/app/vlselect/logsql/field_values_response.qtpl deleted file mode 100644 index bfbf93cb9..000000000 --- a/app/vlselect/logsql/field_values_response.qtpl +++ /dev/null @@ -1,17 +0,0 @@ -{% stripspace %} - -// FieldValuesResponse formats /select/logsql/field_values response -{% func FieldValuesResponse(values []string) %} -{ - "values":[ - {% if len(values) > 0 %} - {%q= values[0] %} - {% for _, v := range values[1:] %} - ,{%q= v %} - {% endfor %} - {% endif %} - ] -} -{% endfunc %} - -{% endstripspace %} diff --git a/app/vlselect/logsql/field_values_response.qtpl.go b/app/vlselect/logsql/field_values_response.qtpl.go deleted file mode 100644 index 14e76f80c..000000000 --- a/app/vlselect/logsql/field_values_response.qtpl.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by qtc from "field_values_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// FieldValuesResponse formats /select/logsql/field_values response - -//line app/vlselect/logsql/field_values_response.qtpl:4 -package logsql - -//line app/vlselect/logsql/field_values_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vlselect/logsql/field_values_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vlselect/logsql/field_values_response.qtpl:4 -func StreamFieldValuesResponse(qw422016 *qt422016.Writer, values []string) { -//line app/vlselect/logsql/field_values_response.qtpl:4 - qw422016.N().S(`{"values":[`) -//line app/vlselect/logsql/field_values_response.qtpl:7 - if len(values) > 0 { -//line app/vlselect/logsql/field_values_response.qtpl:8 - qw422016.N().Q(values[0]) -//line app/vlselect/logsql/field_values_response.qtpl:9 - for _, v := range values[1:] { -//line app/vlselect/logsql/field_values_response.qtpl:9 - qw422016.N().S(`,`) -//line app/vlselect/logsql/field_values_response.qtpl:10 - qw422016.N().Q(v) -//line app/vlselect/logsql/field_values_response.qtpl:11 - } -//line app/vlselect/logsql/field_values_response.qtpl:12 - } -//line app/vlselect/logsql/field_values_response.qtpl:12 - qw422016.N().S(`]}`) -//line app/vlselect/logsql/field_values_response.qtpl:15 -} - -//line app/vlselect/logsql/field_values_response.qtpl:15 -func WriteFieldValuesResponse(qq422016 qtio422016.Writer, values []string) { -//line app/vlselect/logsql/field_values_response.qtpl:15 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vlselect/logsql/field_values_response.qtpl:15 - StreamFieldValuesResponse(qw422016, values) -//line app/vlselect/logsql/field_values_response.qtpl:15 - qt422016.ReleaseWriter(qw422016) -//line app/vlselect/logsql/field_values_response.qtpl:15 -} - -//line app/vlselect/logsql/field_values_response.qtpl:15 -func FieldValuesResponse(values []string) string { -//line app/vlselect/logsql/field_values_response.qtpl:15 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vlselect/logsql/field_values_response.qtpl:15 - WriteFieldValuesResponse(qb422016, values) -//line app/vlselect/logsql/field_values_response.qtpl:15 - qs422016 := string(qb422016.B) -//line app/vlselect/logsql/field_values_response.qtpl:15 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vlselect/logsql/field_values_response.qtpl:15 - return qs422016 -//line app/vlselect/logsql/field_values_response.qtpl:15 -} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index a0d669756..2f50f825e 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -146,7 +146,7 @@ func ProcessFieldNamesRequest(ctx context.Context, w http.ResponseWriter, r *htt // Write results w.Header().Set("Content-Type", "application/json") - WriteFieldNamesResponse(w, fieldNames) + WriteValuesWithHitsJSON(w, fieldNames) } // ProcessFieldValuesRequest handles /select/logsql/field_values request. @@ -186,7 +186,7 @@ func ProcessFieldValuesRequest(ctx context.Context, w http.ResponseWriter, r *ht // Write results w.Header().Set("Content-Type", "application/json") - WriteFieldValuesResponse(w, values) + WriteValuesWithHitsJSON(w, values) } // ProcessStreamLabelNamesRequest processes /select/logsql/stream_label_names request. @@ -208,7 +208,7 @@ func ProcessStreamLabelNamesRequest(ctx context.Context, w http.ResponseWriter, // Write results w.Header().Set("Content-Type", "application/json") - WriteStreamLabelNamesResponse(w, names) + WriteValuesWithHitsJSON(w, names) } // ProcessStreamLabelValuesRequest processes /select/logsql/stream_label_values request. @@ -247,7 +247,7 @@ func ProcessStreamLabelValuesRequest(ctx context.Context, w http.ResponseWriter, // Write results w.Header().Set("Content-Type", "application/json") - WriteStreamLabelValuesResponse(w, values) + WriteValuesWithHitsJSON(w, values) } // ProcessStreamsRequest processes /select/logsql/streams request. @@ -279,7 +279,7 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R // Write results w.Header().Set("Content-Type", "application/json") - WriteStreamsResponse(w, streams) + WriteValuesWithHitsJSON(w, streams) } // ProcessQueryRequest handles /select/logsql/query request. diff --git a/app/vlselect/logsql/logsql.qtpl b/app/vlselect/logsql/logsql.qtpl new file mode 100644 index 000000000..b462e8cbc --- /dev/null +++ b/app/vlselect/logsql/logsql.qtpl @@ -0,0 +1,32 @@ +{% import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) %} + +{% stripspace %} + +// ValuesWithHitsJSON generates JSON from the given values. +{% func ValuesWithHitsJSON(values []logstorage.ValueWithHits) %} +{ + "values":{%= valuesWithHitsJSONArray(values) %} +} +{% endfunc %} + +{% func valuesWithHitsJSONArray(values []logstorage.ValueWithHits) %} +[ + {% if len(values) > 0 %} + {%= valueWithHitsJSON(values[0]) %} + {% for _, v := range values[1:] %} + ,{%= valueWithHitsJSON(v) %} + {% endfor %} + {% endif %} +] +{% endfunc %} + +{% func valueWithHitsJSON(v logstorage.ValueWithHits) %} +{ + "value":{%q= v.Value %}, + "hits":{%dul= v.Hits %} +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/logsql.qtpl.go b/app/vlselect/logsql/logsql.qtpl.go new file mode 100644 index 000000000..47ff291b9 --- /dev/null +++ b/app/vlselect/logsql/logsql.qtpl.go @@ -0,0 +1,152 @@ +// Code generated by qtc from "logsql.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +//line app/vlselect/logsql/logsql.qtpl:1 +package logsql + +//line app/vlselect/logsql/logsql.qtpl:1 +import ( + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" +) + +// ValuesWithHitsJSON generates JSON from the given values. + +//line app/vlselect/logsql/logsql.qtpl:8 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/logsql.qtpl:8 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/logsql.qtpl:8 +func StreamValuesWithHitsJSON(qw422016 *qt422016.Writer, values []logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:8 + qw422016.N().S(`{"values":`) +//line app/vlselect/logsql/logsql.qtpl:10 + streamvaluesWithHitsJSONArray(qw422016, values) +//line app/vlselect/logsql/logsql.qtpl:10 + qw422016.N().S(`}`) +//line app/vlselect/logsql/logsql.qtpl:12 +} + +//line app/vlselect/logsql/logsql.qtpl:12 +func WriteValuesWithHitsJSON(qq422016 qtio422016.Writer, values []logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:12 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/logsql.qtpl:12 + StreamValuesWithHitsJSON(qw422016, values) +//line app/vlselect/logsql/logsql.qtpl:12 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/logsql.qtpl:12 +} + +//line app/vlselect/logsql/logsql.qtpl:12 +func ValuesWithHitsJSON(values []logstorage.ValueWithHits) string { +//line app/vlselect/logsql/logsql.qtpl:12 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/logsql.qtpl:12 + WriteValuesWithHitsJSON(qb422016, values) +//line app/vlselect/logsql/logsql.qtpl:12 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/logsql.qtpl:12 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/logsql.qtpl:12 + return qs422016 +//line app/vlselect/logsql/logsql.qtpl:12 +} + +//line app/vlselect/logsql/logsql.qtpl:14 +func streamvaluesWithHitsJSONArray(qw422016 *qt422016.Writer, values []logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:14 + qw422016.N().S(`[`) +//line app/vlselect/logsql/logsql.qtpl:16 + if len(values) > 0 { +//line app/vlselect/logsql/logsql.qtpl:17 + streamvalueWithHitsJSON(qw422016, values[0]) +//line app/vlselect/logsql/logsql.qtpl:18 + for _, v := range values[1:] { +//line app/vlselect/logsql/logsql.qtpl:18 + qw422016.N().S(`,`) +//line app/vlselect/logsql/logsql.qtpl:19 + streamvalueWithHitsJSON(qw422016, v) +//line app/vlselect/logsql/logsql.qtpl:20 + } +//line app/vlselect/logsql/logsql.qtpl:21 + } +//line app/vlselect/logsql/logsql.qtpl:21 + qw422016.N().S(`]`) +//line app/vlselect/logsql/logsql.qtpl:23 +} + +//line app/vlselect/logsql/logsql.qtpl:23 +func writevaluesWithHitsJSONArray(qq422016 qtio422016.Writer, values []logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:23 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/logsql.qtpl:23 + streamvaluesWithHitsJSONArray(qw422016, values) +//line app/vlselect/logsql/logsql.qtpl:23 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/logsql.qtpl:23 +} + +//line app/vlselect/logsql/logsql.qtpl:23 +func valuesWithHitsJSONArray(values []logstorage.ValueWithHits) string { +//line app/vlselect/logsql/logsql.qtpl:23 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/logsql.qtpl:23 + writevaluesWithHitsJSONArray(qb422016, values) +//line app/vlselect/logsql/logsql.qtpl:23 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/logsql.qtpl:23 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/logsql.qtpl:23 + return qs422016 +//line app/vlselect/logsql/logsql.qtpl:23 +} + +//line app/vlselect/logsql/logsql.qtpl:25 +func streamvalueWithHitsJSON(qw422016 *qt422016.Writer, v logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:25 + qw422016.N().S(`{"value":`) +//line app/vlselect/logsql/logsql.qtpl:27 + qw422016.N().Q(v.Value) +//line app/vlselect/logsql/logsql.qtpl:27 + qw422016.N().S(`,"hits":`) +//line app/vlselect/logsql/logsql.qtpl:28 + qw422016.N().DUL(v.Hits) +//line app/vlselect/logsql/logsql.qtpl:28 + qw422016.N().S(`}`) +//line app/vlselect/logsql/logsql.qtpl:30 +} + +//line app/vlselect/logsql/logsql.qtpl:30 +func writevalueWithHitsJSON(qq422016 qtio422016.Writer, v logstorage.ValueWithHits) { +//line app/vlselect/logsql/logsql.qtpl:30 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/logsql.qtpl:30 + streamvalueWithHitsJSON(qw422016, v) +//line app/vlselect/logsql/logsql.qtpl:30 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/logsql.qtpl:30 +} + +//line app/vlselect/logsql/logsql.qtpl:30 +func valueWithHitsJSON(v logstorage.ValueWithHits) string { +//line app/vlselect/logsql/logsql.qtpl:30 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/logsql.qtpl:30 + writevalueWithHitsJSON(qb422016, v) +//line app/vlselect/logsql/logsql.qtpl:30 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/logsql.qtpl:30 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/logsql.qtpl:30 + return qs422016 +//line app/vlselect/logsql/logsql.qtpl:30 +} diff --git a/app/vlselect/logsql/stream_label_names_response.qtpl b/app/vlselect/logsql/stream_label_names_response.qtpl deleted file mode 100644 index 2e476a79a..000000000 --- a/app/vlselect/logsql/stream_label_names_response.qtpl +++ /dev/null @@ -1,17 +0,0 @@ -{% stripspace %} - -// StreamLabelNamesResponse formats /select/logsql/stream_label_names response -{% func StreamLabelNamesResponse(names []string) %} -{ - "names":[ - {% if len(names) > 0 %} - {%q= names[0] %} - {% for _, v := range names[1:] %} - ,{%q= v %} - {% endfor %} - {% endif %} - ] -} -{% endfunc %} - -{% endstripspace %} diff --git a/app/vlselect/logsql/stream_label_names_response.qtpl.go b/app/vlselect/logsql/stream_label_names_response.qtpl.go deleted file mode 100644 index fa7555656..000000000 --- a/app/vlselect/logsql/stream_label_names_response.qtpl.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by qtc from "stream_label_names_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// StreamLabelNamesResponse formats /select/logsql/stream_label_names response - -//line app/vlselect/logsql/stream_label_names_response.qtpl:4 -package logsql - -//line app/vlselect/logsql/stream_label_names_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vlselect/logsql/stream_label_names_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vlselect/logsql/stream_label_names_response.qtpl:4 -func StreamStreamLabelNamesResponse(qw422016 *qt422016.Writer, names []string) { -//line app/vlselect/logsql/stream_label_names_response.qtpl:4 - qw422016.N().S(`{"names":[`) -//line app/vlselect/logsql/stream_label_names_response.qtpl:7 - if len(names) > 0 { -//line app/vlselect/logsql/stream_label_names_response.qtpl:8 - qw422016.N().Q(names[0]) -//line app/vlselect/logsql/stream_label_names_response.qtpl:9 - for _, v := range names[1:] { -//line app/vlselect/logsql/stream_label_names_response.qtpl:9 - qw422016.N().S(`,`) -//line app/vlselect/logsql/stream_label_names_response.qtpl:10 - qw422016.N().Q(v) -//line app/vlselect/logsql/stream_label_names_response.qtpl:11 - } -//line app/vlselect/logsql/stream_label_names_response.qtpl:12 - } -//line app/vlselect/logsql/stream_label_names_response.qtpl:12 - qw422016.N().S(`]}`) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 -} - -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 -func WriteStreamLabelNamesResponse(qq422016 qtio422016.Writer, names []string) { -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - StreamStreamLabelNamesResponse(qw422016, names) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - qt422016.ReleaseWriter(qw422016) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 -} - -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 -func StreamLabelNamesResponse(names []string) string { -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - WriteStreamLabelNamesResponse(qb422016, names) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - qs422016 := string(qb422016.B) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 - return qs422016 -//line app/vlselect/logsql/stream_label_names_response.qtpl:15 -} diff --git a/app/vlselect/logsql/stream_label_values_response.qtpl b/app/vlselect/logsql/stream_label_values_response.qtpl deleted file mode 100644 index 49c1695df..000000000 --- a/app/vlselect/logsql/stream_label_values_response.qtpl +++ /dev/null @@ -1,17 +0,0 @@ -{% stripspace %} - -// StreamLabelValuesResponse formats /select/logsql/stream_label_values response -{% func StreamLabelValuesResponse(values []string) %} -{ - "values":[ - {% if len(values) > 0 %} - {%q= values[0] %} - {% for _, v := range values[1:] %} - ,{%q= v %} - {% endfor %} - {% endif %} - ] -} -{% endfunc %} - -{% endstripspace %} diff --git a/app/vlselect/logsql/stream_label_values_response.qtpl.go b/app/vlselect/logsql/stream_label_values_response.qtpl.go deleted file mode 100644 index 7e385c60f..000000000 --- a/app/vlselect/logsql/stream_label_values_response.qtpl.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by qtc from "stream_label_values_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// StreamLabelValuesResponse formats /select/logsql/stream_label_values response - -//line app/vlselect/logsql/stream_label_values_response.qtpl:4 -package logsql - -//line app/vlselect/logsql/stream_label_values_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vlselect/logsql/stream_label_values_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vlselect/logsql/stream_label_values_response.qtpl:4 -func StreamStreamLabelValuesResponse(qw422016 *qt422016.Writer, values []string) { -//line app/vlselect/logsql/stream_label_values_response.qtpl:4 - qw422016.N().S(`{"values":[`) -//line app/vlselect/logsql/stream_label_values_response.qtpl:7 - if len(values) > 0 { -//line app/vlselect/logsql/stream_label_values_response.qtpl:8 - qw422016.N().Q(values[0]) -//line app/vlselect/logsql/stream_label_values_response.qtpl:9 - for _, v := range values[1:] { -//line app/vlselect/logsql/stream_label_values_response.qtpl:9 - qw422016.N().S(`,`) -//line app/vlselect/logsql/stream_label_values_response.qtpl:10 - qw422016.N().Q(v) -//line app/vlselect/logsql/stream_label_values_response.qtpl:11 - } -//line app/vlselect/logsql/stream_label_values_response.qtpl:12 - } -//line app/vlselect/logsql/stream_label_values_response.qtpl:12 - qw422016.N().S(`]}`) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 -} - -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 -func WriteStreamLabelValuesResponse(qq422016 qtio422016.Writer, values []string) { -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - StreamStreamLabelValuesResponse(qw422016, values) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - qt422016.ReleaseWriter(qw422016) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 -} - -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 -func StreamLabelValuesResponse(values []string) string { -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - WriteStreamLabelValuesResponse(qb422016, values) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - qs422016 := string(qb422016.B) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 - return qs422016 -//line app/vlselect/logsql/stream_label_values_response.qtpl:15 -} diff --git a/app/vlselect/logsql/streams_response.qtpl b/app/vlselect/logsql/streams_response.qtpl deleted file mode 100644 index 3242aa798..000000000 --- a/app/vlselect/logsql/streams_response.qtpl +++ /dev/null @@ -1,17 +0,0 @@ -{% stripspace %} - -// StreamsResponse formats /select/logsql/streams response -{% func StreamsResponse(streams []string) %} -{ - "streams":[ - {% if len(streams) > 0 %} - {%q= streams[0] %} - {% for _, v := range streams[1:] %} - ,{%q= v %} - {% endfor %} - {% endif %} - ] -} -{% endfunc %} - -{% endstripspace %} diff --git a/app/vlselect/logsql/streams_response.qtpl.go b/app/vlselect/logsql/streams_response.qtpl.go deleted file mode 100644 index e1a1b8feb..000000000 --- a/app/vlselect/logsql/streams_response.qtpl.go +++ /dev/null @@ -1,69 +0,0 @@ -// Code generated by qtc from "streams_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// StreamsResponse formats /select/logsql/streams response - -//line app/vlselect/logsql/streams_response.qtpl:4 -package logsql - -//line app/vlselect/logsql/streams_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vlselect/logsql/streams_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vlselect/logsql/streams_response.qtpl:4 -func StreamStreamsResponse(qw422016 *qt422016.Writer, streams []string) { -//line app/vlselect/logsql/streams_response.qtpl:4 - qw422016.N().S(`{"streams":[`) -//line app/vlselect/logsql/streams_response.qtpl:7 - if len(streams) > 0 { -//line app/vlselect/logsql/streams_response.qtpl:8 - qw422016.N().Q(streams[0]) -//line app/vlselect/logsql/streams_response.qtpl:9 - for _, v := range streams[1:] { -//line app/vlselect/logsql/streams_response.qtpl:9 - qw422016.N().S(`,`) -//line app/vlselect/logsql/streams_response.qtpl:10 - qw422016.N().Q(v) -//line app/vlselect/logsql/streams_response.qtpl:11 - } -//line app/vlselect/logsql/streams_response.qtpl:12 - } -//line app/vlselect/logsql/streams_response.qtpl:12 - qw422016.N().S(`]}`) -//line app/vlselect/logsql/streams_response.qtpl:15 -} - -//line app/vlselect/logsql/streams_response.qtpl:15 -func WriteStreamsResponse(qq422016 qtio422016.Writer, streams []string) { -//line app/vlselect/logsql/streams_response.qtpl:15 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vlselect/logsql/streams_response.qtpl:15 - StreamStreamsResponse(qw422016, streams) -//line app/vlselect/logsql/streams_response.qtpl:15 - qt422016.ReleaseWriter(qw422016) -//line app/vlselect/logsql/streams_response.qtpl:15 -} - -//line app/vlselect/logsql/streams_response.qtpl:15 -func StreamsResponse(streams []string) string { -//line app/vlselect/logsql/streams_response.qtpl:15 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vlselect/logsql/streams_response.qtpl:15 - WriteStreamsResponse(qb422016, streams) -//line app/vlselect/logsql/streams_response.qtpl:15 - qs422016 := string(qb422016.B) -//line app/vlselect/logsql/streams_response.qtpl:15 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vlselect/logsql/streams_response.qtpl:15 - return qs422016 -//line app/vlselect/logsql/streams_response.qtpl:15 -} diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 91a1aa2c3..e36966888 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -112,33 +112,33 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag } // GetFieldNames executes q and returns field names seen in results. -func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) { +func GetFieldNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]logstorage.ValueWithHits, error) { return strg.GetFieldNames(ctx, tenantIDs, q) } // GetFieldValues executes q and returns unique values for the fieldName seen in results. // // If limit > 0, then up to limit unique values are returned. -func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]string, error) { +func GetFieldValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, fieldName string, limit uint64) ([]logstorage.ValueWithHits, error) { return strg.GetFieldValues(ctx, tenantIDs, q, fieldName, limit) } // GetStreamLabelNames executes q and returns stream labels names seen in results. -func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]string, error) { +func GetStreamLabelNames(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query) ([]logstorage.ValueWithHits, error) { return strg.GetStreamLabelNames(ctx, tenantIDs, q) } // GetStreamLabelValues executes q and returns stream label values for the given labelName seen in results. // // If limit > 0, then up to limit unique stream label values are returned. -func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]string, error) { +func GetStreamLabelValues(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, labelName string, limit uint64) ([]logstorage.ValueWithHits, error) { return strg.GetStreamLabelValues(ctx, tenantIDs, q, labelName, limit) } // GetStreams executes q and returns streams seen in query results. // // If limit > 0, then up to limit unique streams are returned. -func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]string, error) { +func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]logstorage.ValueWithHits, error) { return strg.GetStreams(ctx, tenantIDs, q, limit) } diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index dd93610a2..c4d2c7b33 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta ## tip +* FAETURE: return the number of matching log entries per returned value in [HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) results. This simplifies detecting [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) / [stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) values with the biggest number of logs for the given [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/). * FEATURE: improve performance for [regexp filter](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) in the following cases: - If the regexp contains just a phrase without special regular expression chars. For example, `~"foo"`. - If the regexp starts with `.*` or ends with `.*`. For example, `~".*foo.*"`. @@ -27,6 +28,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta * FEATURE: allow disabling automatic unquoting of the matched placeholders in [`extract` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#format-for-extract-pipe-pattern). * BUGFIX: properly parse `!` in front of [exact filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-filter), [exact-prefix filter](https://docs.victoriametrics.com/victorialogs/logsql/#exact-prefix-filter) and [regexp filter](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter). For example, `!~"some regexp"` is properly parsed as `not ="some regexp"`. Previously it was incorrectly parsed as `'~="some regexp"'` [phrase filter](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter). +* BUGFIX: properly sort results by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) when [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) is applied. For example, `_time:5m | sort by (_time) desc | limit 10` properly works now. ## [v0.9.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.9.1-victorialogs) diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 830bc9632..6a5e3ff70 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1246,12 +1246,12 @@ _time:5m | extract if (ip:"") "ip= " ### field_names pipe -Sometimes it may be needed to get all the field names for the selected results. This may be done with `| field_names ...` [pipe](#pipes). -For example, the following query returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) -from the logs over the last 5 minutes: +`| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) +with an estimated number of logs per each field name. +For example, the following query returns all the field names with the number of matching logs over the last 5 minutes: ```logsql -_time:5m | field_names as names +_time:5m | field_names ``` Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in order to sort them if needed. @@ -1622,7 +1622,7 @@ _time:5m | stats ### uniq pipe -`| uniq ...` pipe allows returning only unique results over the selected logs. For example, the following LogsQL query +`| uniq ...` pipe returns unique results over the selected logs. For example, the following LogsQL query returns unique values for `ip` [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) over logs for the last 5 minutes: @@ -1639,6 +1639,12 @@ _time:5m | uniq by (host, path) The unique entries are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in order to sort them if needed. +Add `hits` after `uniq by (...)` in order to return the number of matching logs per each field value: + +```logsql +_time:5m | uniq by (host) hits +``` + Unique entries are stored in memory during query execution. Big number of unique selected entries may require a lot of memory. Sometimes it is enough to return up to `N` unique entries. This can be done by adding `limit N` after `by (...)` clause. This allows limiting memory usage. For example, the following query returns up to 100 unique `(host, path)` pairs for the logs over the last 5 minutes: @@ -1647,6 +1653,8 @@ This allows limiting memory usage. For example, the following query returns up t _time:5m | uniq by (host, path) limit 100 ``` +If the `limit` is reached, then arbitrary subset of unique values can be returned. The `hits` calculation doesn't work when the `limit` is reached. + The `by` keyword can be skipped in `uniq ...` pipe. For example, the following query is equivalent to the previous one: ```logsql diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 44d52fc64..c109a3c22 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -211,6 +211,7 @@ See also: VictoriaLogs provides `/select/logsql/streams?query=&start=&end=` HTTP endpoint, which returns [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. +The response also contains the number of log results per every `stream`. The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. @@ -227,11 +228,19 @@ Below is an example JSON output returned from this endpoint: ```json { - "streams": [ - "{host=\"1.2.3.4\",app=\"foo\"}", - "{host=\"1.2.3.4\",app=\"bar\"}", - "{host=\"10.2.3.4\",app=\"foo\"}", - "{host=\"10.2.3.5\",app=\"baz\"}" + "values": [ + { + "value": "{host=\"host-123\",app=\"foo\"}", + "hits": 34980 + }, + { + "value": "{host=\"host-124\",app=\"bar\"}", + "hits": 32892 + }, + { + "value": "{host=\"host-125\",app=\"baz\"}", + "hits": 32877 + } ] } ``` @@ -250,6 +259,7 @@ See also: VictoriaLogs provides `/select/logsql/stream_label_names?query=&start=&end=` HTTP endpoint, which returns [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label names from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. +The response also contains the number of log results per every label name. The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. @@ -266,12 +276,19 @@ Below is an example JSON output returned from this endpoint: ```json { - "names": [ - "app", - "container", - "datacenter", - "host", - "namespace" + "values": [ + { + "value": "app", + "hits": 1033300623 + }, + { + "value": "container", + "hits": 1033300623 + }, + { + "value": "datacenter", + "hits": 1033300623 + } ] } ``` @@ -288,6 +305,7 @@ See also: VictoriaLogs provides `/select/logsql/stream_label_values?query=&start=&&label=` HTTP endpoint, which returns [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) label values for the label with the given `` name from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. +The response also contains the number of log results per every label value. The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. @@ -305,10 +323,14 @@ Below is an example JSON output returned from this endpoint: ```json { "values": [ - "host-0", - "host-1", - "host-2", - "host-3" + { + "value": "host-1", + "hits": 69426656 + }, + { + "value": "host-2", + "hits": 66507749 + } ] } ``` @@ -327,6 +349,7 @@ See also: VictoriaLogs provides `/select/logsql/field_names?query=&start=&end=` HTTP endpoint, which returns field names from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. +The response also contains the number of log results per every field name. The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. @@ -343,13 +366,19 @@ Below is an example JSON output returned from this endpoint: ```json { - "names": [ - "_msg", - "_stream", - "_time", - "host", - "level", - "location" + "values": [ + { + "value": "_msg", + "hits": 1033300623 + }, + { + "value": "_stream", + "hits": 1033300623 + }, + { + "value": "_time", + "hits": 1033300623 + } ] } ``` @@ -366,6 +395,7 @@ See also: VictoriaLogs provides `/select/logsql/field_values?query=&field=&start=&end=` HTTP endpoint, which returns unique values for the given `` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) from results of the given `` [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[ ... ]` time range. +The response also contains the number of log results per every field value. The `` and `` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). If `` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs. @@ -383,17 +413,25 @@ Below is an example JSON output returned from this endpoint: ```json { "values": [ - "host_0", - "host_1", - "host_10", - "host_100", - "host_1000" + { + "value": "host-1", + "hits": 69426656 + }, + { + "value": "host-2", + "hits": 66507749 + }, + { + "value": "host-3", + "hits": 65454351 + } ] } ``` The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`. The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values. +When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably. See also: diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 5162c5f57..097f052b0 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1804,6 +1804,8 @@ func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn } // addValue adds the given values v to rc. +// +// rc is valid until v is modified. func (rc *resultColumn) addValue(v string) { rc.values = append(rc.values, v) } diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 0efde5c88..6bbd7e08d 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1276,7 +1276,6 @@ func TestParseQueryFailure(t *testing.T) { f(`foo | fields bar,,`) // invalid field_names - f(`foo | field_names`) f(`foo | field_names |`) f(`foo | field_names (`) f(`foo | field_names )`) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 284855c8d..5feb23cf8 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -10,7 +10,8 @@ import ( // // See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe type pipeFieldNames struct { - // resultName is the name of the column to write results to. + // resultName is an optional name of the column to write results to. + // By default results are written into 'name' column. resultName string // isFirstPipe is set to true if '| field_names' pipe is the first in the query. @@ -20,7 +21,11 @@ type pipeFieldNames struct { } func (pf *pipeFieldNames) String() string { - return "field_names as " + quoteTokenIfNeeded(pf.resultName) + s := "field_names" + if pf.resultName != "name" { + s += " as " + quoteTokenIfNeeded(pf.resultName) + } + return s } func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) { @@ -34,13 +39,6 @@ func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fields func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { shards := make([]pipeFieldNamesProcessorShard, workersCount) - for i := range shards { - shards[i] = pipeFieldNamesProcessorShard{ - pipeFieldNamesProcessorShardNopad: pipeFieldNamesProcessorShardNopad{ - m: make(map[string]struct{}), - }, - } - } pfp := &pipeFieldNamesProcessor{ pf: pf, @@ -68,8 +66,15 @@ type pipeFieldNamesProcessorShard struct { } type pipeFieldNamesProcessorShardNopad struct { - // m holds unique field names. - m map[string]struct{} + // m holds hits per each field name + m map[string]*uint64 +} + +func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 { + if shard.m == nil { + shard.m = make(map[string]*uint64) + } + return shard.m } func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { @@ -78,12 +83,21 @@ func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pfp.shards[workerID] + m := shard.getM() + cs := br.getColumns() for _, c := range cs { - if _, ok := shard.m[c.name]; !ok { + pHits, ok := m[c.name] + if !ok { nameCopy := strings.Clone(c.name) - shard.m[nameCopy] = struct{}{} + hits := uint64(0) + pHits = &hits + m[nameCopy] = pHits } + + // Assume that the column is set for all the rows in the block. + // This is much faster than reading all the column values and counting non-empty rows. + *pHits += uint64(len(br.timestamps)) } } @@ -94,15 +108,25 @@ func (pfp *pipeFieldNamesProcessor) flush() error { // merge state across shards shards := pfp.shards - m := shards[0].m + m := shards[0].getM() shards = shards[1:] for i := range shards { - for k := range shards[i].m { - m[k] = struct{}{} + for name, pHitsSrc := range shards[i].getM() { + pHits, ok := m[name] + if !ok { + m[name] = pHitsSrc + } else { + *pHits += *pHitsSrc + } } } if pfp.pf.isFirstPipe { - m["_time"] = struct{}{} + pHits := m["_stream"] + if pHits == nil { + hits := uint64(0) + pHits = &hits + } + m["_time"] = pHits } // write result @@ -110,8 +134,11 @@ func (pfp *pipeFieldNamesProcessor) flush() error { pfp: pfp, } wctx.rcs[0].name = pfp.pf.resultName - for k := range m { - wctx.writeRow(k) + wctx.rcs[1].name = "hits" + + for name, pHits := range m { + hits := string(marshalUint64String(nil, *pHits)) + wctx.writeRow(name, hits) } wctx.flush() @@ -120,7 +147,7 @@ func (pfp *pipeFieldNamesProcessor) flush() error { type pipeFieldNamesWriteContext struct { pfp *pipeFieldNamesProcessor - rcs [1]resultColumn + rcs [2]resultColumn br blockResult // rowsCount is the number of rows in the current block @@ -130,9 +157,10 @@ type pipeFieldNamesWriteContext struct { valuesLen int } -func (wctx *pipeFieldNamesWriteContext) writeRow(v string) { - wctx.rcs[0].addValue(v) - wctx.valuesLen += len(v) +func (wctx *pipeFieldNamesWriteContext) writeRow(name, hits string) { + wctx.rcs[0].addValue(name) + wctx.rcs[1].addValue(hits) + wctx.valuesLen += len(name) + len(hits) wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() @@ -145,11 +173,12 @@ func (wctx *pipeFieldNamesWriteContext) flush() { wctx.valuesLen = 0 // Flush rcs to ppBase - br.setResultColumns(wctx.rcs[:1], wctx.rowsCount) + br.setResultColumns(wctx.rcs[:], wctx.rowsCount) wctx.rowsCount = 0 wctx.pfp.ppBase.writeBlock(0, br) br.reset() wctx.rcs[0].resetValues() + wctx.rcs[1].resetValues() } func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) { @@ -158,12 +187,20 @@ func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) { } lex.nextToken() + resultName := "name" if lex.isKeyword("as") { lex.nextToken() - } - resultName, err := parseFieldName(lex) - if err != nil { - return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err) + name, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err) + } + resultName = name + } else if !lex.isKeyword("", "|") { + name, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err) + } + resultName = name } pf := &pipeFieldNames{ diff --git a/lib/logstorage/pipe_field_names_test.go b/lib/logstorage/pipe_field_names_test.go index 72e5d4adc..6edf36146 100644 --- a/lib/logstorage/pipe_field_names_test.go +++ b/lib/logstorage/pipe_field_names_test.go @@ -10,6 +10,7 @@ func TestParsePipeFieldNamesSuccess(t *testing.T) { expectParsePipeSuccess(t, pipeStr) } + f(`field_names`) f(`field_names as x`) } @@ -19,7 +20,6 @@ func TestParsePipeFieldNamesFailure(t *testing.T) { expectParsePipeFailure(t, pipeStr) } - f(`field_names`) f(`field_names(foo)`) f(`field_names a b`) f(`field_names as`) @@ -32,32 +32,47 @@ func TestPipeFieldNames(t *testing.T) { } // single row, result column doesn't clash with original columns - f("field_names as x", [][]Field{ + f("field_names", [][]Field{ { {"_msg", `{"foo":"bar"}`}, {"a", `test`}, }, }, [][]Field{ { - {"x", "_msg"}, + {"name", "_msg"}, + {"hits", "1"}, }, { - {"x", "a"}, + {"name", "a"}, + {"hits", "1"}, }, }) // single row, result column do clashes with original columns - f("field_names as _msg", [][]Field{ + f("field_names as x", [][]Field{ { - {"_msg", `{"foo":"bar"}`}, {"a", `test`}, + {"b", "aaa"}, + }, + { + {"a", `bar`}, + }, + { + {"a", `bar`}, + {"c", `bar`}, }, }, [][]Field{ { - {"_msg", "_msg"}, + {"x", "a"}, + {"hits", "3"}, }, { - {"_msg", "a"}, + {"x", "b"}, + {"hits", "1"}, + }, + { + {"x", "c"}, + {"hits", "1"}, }, }) } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 4aa3d5bdf..6738878a3 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -477,14 +477,12 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo wctx.rcs = rcs } - var tmpBuf []byte byColumns := r.byColumns byColumnsIsTime := r.byColumnsIsTime for i := range byFields { v := byColumns[i] if byColumnsIsTime[i] { - tmpBuf = marshalTimestampRFC3339NanoString(tmpBuf[:0], r.timestamp) - v = bytesutil.ToUnsafeString(tmpBuf) + v = string(marshalTimestampRFC3339NanoString(nil, r.timestamp)) } rcs[i].addValue(v) wctx.valuesLen += len(v) diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 32b9e8f28..ab8584d57 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -20,6 +20,9 @@ type pipeUniq struct { // fields contains field names for returning unique values byFields []string + // if hitsFieldName isn't empty, then the number of hits per each unique value is stored in this field. + hitsFieldName string + limit uint64 } @@ -28,6 +31,9 @@ func (pu *pipeUniq) String() string { if len(pu.byFields) > 0 { s += " by (" + fieldNamesString(pu.byFields) + ")" } + if pu.hitsFieldName != "" { + s += " hits" + } if pu.limit > 0 { s += fmt.Sprintf(" limit %d", pu.limit) } @@ -53,7 +59,6 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c shards[i] = pipeUniqProcessorShard{ pipeUniqProcessorShardNopad: pipeUniqProcessorShardNopad{ pu: pu, - m: make(map[string]struct{}), stateSizeBudget: stateSizeBudgetChunk, }, } @@ -98,8 +103,8 @@ type pipeUniqProcessorShardNopad struct { // pu points to the parent pipeUniq. pu *pipeUniq - // m holds unique rows. - m map[string]struct{} + // m holds per-row hits. + m map[string]*uint64 // keyBuf is a temporary buffer for building keys for m. keyBuf []byte @@ -120,6 +125,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { return false } + needHits := shard.pu.hitsFieldName != "" byFields := shard.pu.byFields if len(byFields) == 0 { // Take into account all the columns in br. @@ -132,7 +138,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(c.name)) keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) } - shard.updateState(bytesutil.ToUnsafeString(keyBuf)) + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) } shard.keyBuf = keyBuf return true @@ -142,20 +148,34 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { c := br.getColumnByName(byFields[0]) if c.isConst { v := c.valuesEncoded[0] - shard.updateState(v) + shard.updateState(v, uint64(len(br.timestamps))) return true } if c.valueType == valueTypeDict { - for _, v := range c.dictValues { - shard.updateState(v) + if needHits { + a := encoding.GetUint64s(len(c.dictValues)) + hits := a.A + valuesEncoded := c.getValuesEncoded(br) + for _, v := range valuesEncoded { + idx := unmarshalUint8(v) + hits[idx]++ + } + for i, v := range c.dictValues { + shard.updateState(v, hits[i]) + } + encoding.PutUint64s(a) + } else { + for _, v := range c.dictValues { + shard.updateState(v, 0) + } } return true } values := c.getValues(br) for i, v := range values { - if i == 0 || values[i-1] != values[i] { - shard.updateState(v) + if needHits || i == 0 || values[i-1] != values[i] { + shard.updateState(v, 1) } } return true @@ -174,7 +194,7 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { for i := range br.timestamps { seenValue := true for _, values := range columnValues { - if i == 0 || values[i-1] != values[i] { + if needHits || i == 0 || values[i-1] != values[i] { seenValue = false break } @@ -187,19 +207,31 @@ func (shard *pipeUniqProcessorShard) writeBlock(br *blockResult) bool { for _, values := range columnValues { keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(values[i])) } - shard.updateState(bytesutil.ToUnsafeString(keyBuf)) + shard.updateState(bytesutil.ToUnsafeString(keyBuf), 1) } shard.keyBuf = keyBuf return true } -func (shard *pipeUniqProcessorShard) updateState(v string) { - if _, ok := shard.m[v]; !ok { +func (shard *pipeUniqProcessorShard) updateState(v string, hits uint64) { + m := shard.getM() + pHits, ok := m[v] + if !ok { vCopy := strings.Clone(v) - shard.m[vCopy] = struct{}{} - shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)) + hits := uint64(0) + pHits = &hits + m[vCopy] = pHits + shard.stateSizeBudget -= len(vCopy) + int(unsafe.Sizeof(vCopy)+unsafe.Sizeof(hits)+unsafe.Sizeof(pHits)) } + *pHits += hits +} + +func (shard *pipeUniqProcessorShard) getM() map[string]*uint64 { + if shard.m == nil { + shard.m = make(map[string]*uint64) + } + return shard.m } func (pup *pipeUniqProcessor) writeBlock(workerID uint, br *blockResult) { @@ -235,18 +267,27 @@ func (pup *pipeUniqProcessor) flush() error { // merge state across shards shards := pup.shards - m := shards[0].m + m := shards[0].getM() shards = shards[1:] for i := range shards { if needStop(pup.stopCh) { return nil } - for k := range shards[i].m { - m[k] = struct{}{} + for k, pHitsSrc := range shards[i].getM() { + pHits, ok := m[k] + if !ok { + m[k] = pHitsSrc + } else { + *pHits += *pHitsSrc + } } } + // There is little sense in returning partial hits when the limit on the number of unique entries is reached. + // It is better from UX experience is to return zero hits instead. + resetHits := pup.pu.limit > 0 && uint64(len(m)) >= pup.pu.limit + // write result wctx := &pipeUniqWriteContext{ pup: pup, @@ -254,8 +295,23 @@ func (pup *pipeUniqProcessor) flush() error { byFields := pup.pu.byFields var rowFields []Field + addHitsFieldIfNeeded := func(dst []Field, hits uint64) []Field { + if pup.pu.hitsFieldName == "" { + return dst + } + if resetHits { + hits = 0 + } + hitsStr := string(marshalUint64String(nil, hits)) + dst = append(dst, Field{ + Name: pup.pu.hitsFieldName, + Value: hitsStr, + }) + return dst + } + if len(byFields) == 0 { - for k := range m { + for k, pHits := range m { if needStop(pup.stopCh) { return nil } @@ -280,11 +336,12 @@ func (pup *pipeUniqProcessor) flush() error { Value: bytesutil.ToUnsafeString(value), }) } + rowFields = addHitsFieldIfNeeded(rowFields, *pHits) wctx.writeRow(rowFields) } } else if len(byFields) == 1 { fieldName := byFields[0] - for k := range m { + for k, pHits := range m { if needStop(pup.stopCh) { return nil } @@ -293,10 +350,11 @@ func (pup *pipeUniqProcessor) flush() error { Name: fieldName, Value: k, }) + rowFields = addHitsFieldIfNeeded(rowFields, *pHits) wctx.writeRow(rowFields) } } else { - for k := range m { + for k, pHits := range m { if needStop(pup.stopCh) { return nil } @@ -317,6 +375,7 @@ func (pup *pipeUniqProcessor) flush() error { }) fieldIdx++ } + rowFields = addHitsFieldIfNeeded(rowFields, *pHits) wctx.writeRow(rowFields) } } @@ -418,6 +477,16 @@ func parsePipeUniq(lex *lexer) (*pipeUniq, error) { pu.byFields = bfs } + if lex.isKeyword("hits") { + lex.nextToken() + hitsFieldName := "hits" + for slices.Contains(pu.byFields, hitsFieldName) { + hitsFieldName += "s" + } + + pu.hitsFieldName = hitsFieldName + } + if lex.isKeyword("limit") { lex.nextToken() n, ok := tryParseUint64(lex.token) diff --git a/lib/logstorage/pipe_uniq_test.go b/lib/logstorage/pipe_uniq_test.go index 1b59ca88c..68e8f0042 100644 --- a/lib/logstorage/pipe_uniq_test.go +++ b/lib/logstorage/pipe_uniq_test.go @@ -11,11 +11,15 @@ func TestParsePipeUniqSuccess(t *testing.T) { } f(`uniq`) + f(`uniq hits`) f(`uniq limit 10`) + f(`uniq hits limit 10`) f(`uniq by (x)`) f(`uniq by (x) limit 10`) f(`uniq by (x, y)`) + f(`uniq by (x, y) hits`) f(`uniq by (x, y) limit 10`) + f(`uniq by (x, y) hits limit 10`) } func TestParsePipeUniqFailure(t *testing.T) { @@ -26,6 +30,7 @@ func TestParsePipeUniqFailure(t *testing.T) { f(`uniq foo`) f(`uniq by`) + f(`uniq by hits`) f(`uniq by(x) limit`) f(`uniq by(x) limit foo`) } @@ -62,6 +67,62 @@ func TestPipeUniq(t *testing.T) { }, }) + f("uniq hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + {"hits", "1"}, + }, + }) + + f("uniq hits limit 2", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "0"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + {"hits", "0"}, + }, + }) + f("uniq by (a)", [][]Field{ { {"a", `2`}, @@ -82,6 +143,27 @@ func TestPipeUniq(t *testing.T) { }, }) + f("uniq by (a) hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"hits", "3"}, + }, + }) + f("uniq by (b)", [][]Field{ { {"a", `2`}, @@ -105,6 +187,31 @@ func TestPipeUniq(t *testing.T) { }, }) + f("uniq by (b) hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"b", "3"}, + {"hits", "2"}, + }, + { + {"b", "54"}, + {"hits", "1"}, + }, + }) + f("uniq by (c)", [][]Field{ { {"a", `2`}, @@ -128,6 +235,31 @@ func TestPipeUniq(t *testing.T) { }, }) + f("uniq by (c) hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"c", ""}, + {"hits", "2"}, + }, + { + {"c", "d"}, + {"hits", "1"}, + }, + }) + f("uniq by (d)", [][]Field{ { {"a", `2`}, @@ -148,6 +280,27 @@ func TestPipeUniq(t *testing.T) { }, }) + f("uniq by (d) hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"d", ""}, + {"hits", "3"}, + }, + }) + f("uniq by (a, b)", [][]Field{ { {"a", `2`}, @@ -172,6 +325,33 @@ func TestPipeUniq(t *testing.T) { {"b", "54"}, }, }) + + f("uniq by (a, b) hits", [][]Field{ + { + {"a", `2`}, + {"b", `3`}, + }, + { + {"a", "2"}, + {"b", "3"}, + }, + { + {"a", `2`}, + {"b", `54`}, + {"c", "d"}, + }, + }, [][]Field{ + { + {"a", "2"}, + {"b", "3"}, + {"hits", "2"}, + }, + { + {"a", "2"}, + {"b", "54"}, + {"hits", "1"}, + }, + }) } func TestPipeUniqUpdateNeededFields(t *testing.T) { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index f3395f4f1..5ff0384f6 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -145,9 +145,9 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, } // GetFieldNames returns field names from q results for the given tenantIDs. -func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { +func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) { pipes := append([]pipe{}, q.pipes...) - pipeStr := "field_names as names | sort by (names)" + pipeStr := "field_names" lex := newLexer(pipeStr) pf, err := parsePipeFieldNames(lex) @@ -156,36 +156,24 @@ func (s *Storage) GetFieldNames(ctx context.Context, tenantIDs []TenantID, q *Qu } pf.isFirstPipe = len(pipes) == 0 - if !lex.isKeyword("|") { - logger.Panicf("BUG: unexpected token after 'field_names' pipe at [%s]: %q", pipeStr, lex.token) - } - lex.nextToken() - - ps, err := parsePipeSort(lex) - if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err) - } if !lex.isEnd() { logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s) } - pipes = append(pipes, pf, ps) + pipes = append(pipes, pf) q = &Query{ f: q.f, pipes: pipes, } - return s.runSingleColumnQuery(ctx, tenantIDs, q) + return s.runValuesWithHitsQuery(ctx, tenantIDs, q) } -// GetFieldValues returns unique values for the given fieldName returned by q for the given tenantIDs. -// -// If limit > 0, then up to limit unique values are returned. -func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]string, error) { +func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string) ([]string, error) { pipes := append([]pipe{}, q.pipes...) quotedFieldName := quoteTokenIfNeeded(fieldName) - pipeStr := fmt.Sprintf("uniq by (%s) limit %d | sort by (%s)", quotedFieldName, limit, quotedFieldName) + pipeStr := fmt.Sprintf("uniq by (%s)", quotedFieldName) lex := newLexer(pipeStr) pu, err := parsePipeUniq(lex) @@ -193,87 +181,17 @@ func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Q logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err) } - if !lex.isKeyword("|") { - logger.Panicf("BUG: unexpected token after 'uniq' pipe at [%s]: %q", pipeStr, lex.token) - } - lex.nextToken() - - ps, err := parsePipeSort(lex) - if err != nil { - logger.Panicf("BUG: unexpected error when parsing 'sort' pipe at [%s]: %s", pipeStr, err) - } if !lex.isEnd() { logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s) } - pipes = append(pipes, pu, ps) + pipes = append(pipes, pu) q = &Query{ f: q.f, pipes: pipes, } - return s.runSingleColumnQuery(ctx, tenantIDs, q) -} - -// GetStreamLabelNames returns stream label names from q results for the given tenantIDs. -func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { - streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) - if err != nil { - return nil, err - } - - var names []string - m := make(map[string]struct{}) - forEachStreamLabel(streams, func(label Field) { - if _, ok := m[label.Name]; !ok { - nameCopy := strings.Clone(label.Name) - names = append(names, nameCopy) - m[nameCopy] = struct{}{} - } - }) - sortStrings(names) - - return names, nil -} - -// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs. -// -// If limit > 9, then up to limit unique label values are returned. -func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]string, error) { - streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) - if err != nil { - return nil, err - } - - var values []string - m := make(map[string]struct{}) - forEachStreamLabel(streams, func(label Field) { - if label.Name != labelName { - return - } - if _, ok := m[label.Value]; !ok { - valueCopy := strings.Clone(label.Value) - values = append(values, valueCopy) - m[valueCopy] = struct{}{} - } - }) - if uint64(len(values)) > limit { - values = values[:limit] - } - sortStrings(values) - - return values, nil -} - -// GetStreams returns streams from q results for the given tenantIDs. -// -// If limit > 0, then up to limit unique streams are returned. -func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]string, error) { - return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit) -} - -func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { var values []string var valuesLock sync.Mutex writeBlockResult := func(_ uint, br *blockResult) { @@ -283,13 +201,14 @@ func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID cs := br.getColumns() if len(cs) != 1 { - logger.Panicf("BUG: expecting only a single column; got %d columns", len(cs)) + logger.Panicf("BUG: expecting one column; got %d columns", len(cs)) } + columnValues := cs[0].getValues(br) columnValuesCopy := make([]string, len(columnValues)) - for i, v := range columnValues { - columnValuesCopy[i] = strings.Clone(v) + for i := range columnValues { + columnValuesCopy[i] = strings.Clone(columnValues[i]) } valuesLock.Lock() @@ -297,21 +216,182 @@ func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID valuesLock.Unlock() } - err := s.runQuery(ctx, tenantIDs, q, writeBlockResult) - if err != nil { + if err := s.runQuery(ctx, tenantIDs, q, writeBlockResult); err != nil { return nil, err } return values, nil } +// GetFieldValues returns unique values with the number of hits for the given fieldName returned by q for the given tenantIDs. +// +// If limit > 0, then up to limit unique values are returned. +func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) { + pipes := append([]pipe{}, q.pipes...) + quotedFieldName := quoteTokenIfNeeded(fieldName) + pipeStr := fmt.Sprintf("uniq by (%s) hits limit %d", quotedFieldName, limit) + lex := newLexer(pipeStr) + + pu, err := parsePipeUniq(lex) + if err != nil { + logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err) + } + + if !lex.isEnd() { + logger.Panicf("BUG: unexpected tail left after parsing pipes [%s]: %q", pipeStr, lex.s) + } + + pipes = append(pipes, pu) + + q = &Query{ + f: q.f, + pipes: pipes, + } + + return s.runValuesWithHitsQuery(ctx, tenantIDs, q) +} + +// ValueWithHits contains value and hits. +type ValueWithHits struct { + Value string + Hits uint64 +} + +func toValuesWithHits(m map[string]*uint64) []ValueWithHits { + results := make([]ValueWithHits, 0, len(m)) + for k, pHits := range m { + results = append(results, ValueWithHits{ + Value: k, + Hits: *pHits, + }) + } + sortValuesWithHits(results) + return results +} + +func sortValuesWithHits(results []ValueWithHits) { + slices.SortFunc(results, func(a, b ValueWithHits) int { + if a.Hits == b.Hits { + if a.Value == b.Value { + return 0 + } + if lessString(a.Value, b.Value) { + return -1 + } + return 1 + } + // Sort in descending order of hits + if a.Hits < b.Hits { + return 1 + } + return -1 + }) +} + +// GetStreamLabelNames returns stream label names from q results for the given tenantIDs. +func (s *Storage) GetStreamLabelNames(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) { + streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) + if err != nil { + return nil, err + } + + m := make(map[string]*uint64) + forEachStreamLabel(streams, func(label Field, hits uint64) { + pHits, ok := m[label.Name] + if !ok { + nameCopy := strings.Clone(label.Name) + hitsLocal := uint64(0) + pHits = &hitsLocal + m[nameCopy] = pHits + } + *pHits += hits + }) + names := toValuesWithHits(m) + return names, nil +} + +// GetStreamLabelValues returns stream label values for the given labelName from q results for the given tenantIDs. +// +// If limit > 9, then up to limit unique label values are returned. +func (s *Storage) GetStreamLabelValues(ctx context.Context, tenantIDs []TenantID, q *Query, labelName string, limit uint64) ([]ValueWithHits, error) { + streams, err := s.GetStreams(ctx, tenantIDs, q, math.MaxUint64) + if err != nil { + return nil, err + } + + m := make(map[string]*uint64) + forEachStreamLabel(streams, func(label Field, hits uint64) { + if label.Name != labelName { + return + } + pHits, ok := m[label.Value] + if !ok { + valueCopy := strings.Clone(label.Value) + hitsLocal := uint64(0) + pHits = &hitsLocal + m[valueCopy] = pHits + } + *pHits += hits + }) + values := toValuesWithHits(m) + if limit > 0 && uint64(len(values)) > limit { + values = values[:limit] + } + return values, nil +} + +// GetStreams returns streams from q results for the given tenantIDs. +// +// If limit > 0, then up to limit unique streams are returned. +func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]ValueWithHits, error) { + return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit) +} + +func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) { + var results []ValueWithHits + var resultsLock sync.Mutex + writeBlockResult := func(_ uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + cs := br.getColumns() + if len(cs) != 2 { + logger.Panicf("BUG: expecting two columns; got %d columns", len(cs)) + } + + columnValues := cs[0].getValues(br) + columnHits := cs[1].getValues(br) + + valuesWithHits := make([]ValueWithHits, len(columnValues)) + for i := range columnValues { + x := &valuesWithHits[i] + hits, _ := tryParseUint64(columnHits[i]) + x.Value = strings.Clone(columnValues[i]) + x.Hits = hits + } + + resultsLock.Lock() + results = append(results, valuesWithHits...) + resultsLock.Unlock() + } + + err := s.runQuery(ctx, tenantIDs, q, writeBlockResult) + if err != nil { + return nil, err + } + sortValuesWithHits(results) + + return results, nil +} + func (s *Storage) initFilterInValues(ctx context.Context, tenantIDs []TenantID, q *Query) (*Query, error) { if !hasFilterInWithQueryForFilter(q.f) && !hasFilterInWithQueryForPipes(q.pipes) { return q, nil } getFieldValues := func(q *Query, fieldName string) ([]string, error) { - return s.GetFieldValues(ctx, tenantIDs, q, fieldName, 0) + return s.getFieldValuesNoHits(ctx, tenantIDs, q, fieldName) } cache := make(map[string][]string) fNew, err := initFilterInValuesForFilter(cache, q.f, getFieldValues) @@ -1007,16 +1087,17 @@ func getFilterTimeRange(f filter) (int64, int64) { return math.MinInt64, math.MaxInt64 } -func forEachStreamLabel(streams []string, f func(label Field)) { +func forEachStreamLabel(streams []ValueWithHits, f func(label Field, hits uint64)) { var labels []Field - for _, stream := range streams { + for i := range streams { var err error - labels, err = parseStreamLabels(labels[:0], stream) + labels, err = parseStreamLabels(labels[:0], streams[i].Value) if err != nil { continue } - for i := range labels { - f(labels[i]) + hits := streams[i].Hits + for j := range labels { + f(labels[j], hits) } } }