From c9bb4ddeed6f6e8463bab9854b4565d7f788719a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 6 Sep 2024 23:00:55 +0200 Subject: [PATCH] app/vlselect: add /select/logsql/stats_query endpoint, which is going to be used by vmalert Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6942 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6706 --- app/vlselect/logsql/logsql.go | 94 ++++++++++++- app/vlselect/logsql/stats_query_response.qtpl | 36 +++++ .../logsql/stats_query_response.qtpl.go | 133 ++++++++++++++++++ app/vlselect/main.go | 5 + app/vmselect/main.go | 35 ++--- .../prometheus/error_response.qtpl.go | 61 -------- app/vmselect/promql/eval.go | 17 +-- app/vmselect/promql/exec.go | 18 --- docs/VictoriaLogs/CHANGELOG.md | 1 + docs/VictoriaLogs/querying/README.md | 73 ++++++++++ lib/httpserver/prometheus.go | 47 +++++++ .../httpserver/prometheus_error_response.qtpl | 4 +- .../prometheus_error_response.qtpl.go | 61 ++++++++ lib/logstorage/parser.go | 98 ++++++++++++- lib/logstorage/parser_test.go | 91 ++++++++++++ lib/logstorage/pipe_stats.go | 2 +- 16 files changed, 655 insertions(+), 121 deletions(-) create mode 100644 app/vlselect/logsql/stats_query_response.qtpl create mode 100644 app/vlselect/logsql/stats_query_response.qtpl.go delete mode 100644 app/vmselect/prometheus/error_response.qtpl.go create mode 100644 lib/httpserver/prometheus.go rename app/vmselect/prometheus/error_response.qtpl => lib/httpserver/prometheus_error_response.qtpl (60%) create mode 100644 lib/httpserver/prometheus_error_response.qtpl.go diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 03e886f70..ad636b239 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "net/http" + "slices" "sort" "strconv" "strings" @@ -380,6 +381,8 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R } // ProcessLiveTailRequest processes live tailing request to /select/logsq/tail +// +// See https://docs.victoriametrics.com/victorialogs/querying/#live-tailing func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { liveTailRequests.Inc() defer liveTailRequests.Dec() @@ -560,9 +563,82 @@ func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { return tailRows, nil } +// ProcessStatsQueryRequest handles /select/logsql/stats_query request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats +func ProcessStatsQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.SendPrometheusError(w, r, err) + return + } + + // Verify that q ends with `| stats` pipe + byFields, ok := q.GetStatsByFields() + if !ok { + err := fmt.Errorf("the query must end with '| stats ...'; got [%s]", q) + httpserver.SendPrometheusError(w, r, err) + return + } + + q.Optimize() + + var rows []statsRow + var rowsLock sync.Mutex + + timestamp := q.GetTimestamp() + writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { + clonedColumnNames := make([]string, len(columns)) + for i, c := range columns { + clonedColumnNames[i] = strings.Clone(c.Name) + } + for i := range timestamps { + labels := make([]logstorage.Field, 0, len(byFields)) + for j, c := range columns { + if slices.Contains(byFields, c.Name) { + labels = append(labels, logstorage.Field{ + Name: clonedColumnNames[j], + Value: strings.Clone(c.Values[i]), + }) + } + } + + for j, c := range columns { + if !slices.Contains(byFields, c.Name) { + r := statsRow{ + Name: clonedColumnNames[j], + Labels: labels, + Timestamp: timestamp, + Value: strings.Clone(c.Values[i]), + } + rowsLock.Lock() + rows = append(rows, r) + rowsLock.Unlock() + } + } + } + } + + if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil { + err = fmt.Errorf("cannot execute query [%s]: %s", q, err) + httpserver.SendPrometheusError(w, r, err) + return + } + + w.Header().Set("Content-Type", "application/json") + WriteStatsQueryResponse(w, rows) +} + +type statsRow struct { + Name string + Labels []logstorage.Field + Timestamp int64 + Value string +} + // ProcessQueryRequest handles /select/logsql/query request. // -// See https://docs.victoriametrics.com/victorialogs/querying/#http-api +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-logs func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { q, tenantIDs, err := parseCommonArgs(r) if err != nil { @@ -728,9 +804,23 @@ func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, } tenantIDs := []logstorage.TenantID{tenantID} + // Parse optional time arg + timestamp, okTime, err := getTimeNsec(r, "time") + if err != nil { + return nil, nil, err + } + if !okTime { + // If time arg is missing, then evaluate query at the current timestamp + timestamp = time.Now().UnixNano() + } + + // decrease timestamp by one nanosecond in order to avoid capturing logs belonging + // to the first nanosecond at the next period of time (month, week, day, hour, etc.) + timestamp-- + // Parse query qStr := r.FormValue("query") - q, err := logstorage.ParseQuery(qStr) + q, err := logstorage.ParseQueryAtTimestamp(qStr, timestamp) if err != nil { return nil, nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err) } diff --git a/app/vlselect/logsql/stats_query_response.qtpl b/app/vlselect/logsql/stats_query_response.qtpl new file mode 100644 index 000000000..3d818546b --- /dev/null +++ b/app/vlselect/logsql/stats_query_response.qtpl @@ -0,0 +1,36 @@ +{% stripspace %} + +// StatsQueryResponse generates response for /select/logsql/stats_query +{% func StatsQueryResponse(rows []statsRow) %} +{ + "status":"success", + "data":{ + "resultType":"vector", + "result":[ + {% if len(rows) > 0 %} + {%= formatStatsRow(&rows[0]) %} + {% code rows = rows[1:] %} + {% for i := range rows %} + ,{%= formatStatsRow(&rows[i]) %} + {% endfor %} + {% endif %} + ] + } +} +{% endfunc %} + +{% func formatStatsRow(r *statsRow) %} +{ + "metric":{ + "__name__":{%q= r.Name %} + {% if len(r.Labels) > 0 %} + {% for _, label := range r.Labels %} + ,{%q= label.Name %}:{%q= label.Value %} + {% endfor %} + {% endif %} + }, + "value":[{%f= float64(r.Timestamp)/1e9 %},{%q= r.Value %}] +} +{% endfunc %} + +{% endstripspace %} diff --git a/app/vlselect/logsql/stats_query_response.qtpl.go b/app/vlselect/logsql/stats_query_response.qtpl.go new file mode 100644 index 000000000..2548a18c3 --- /dev/null +++ b/app/vlselect/logsql/stats_query_response.qtpl.go @@ -0,0 +1,133 @@ +// Code generated by qtc from "stats_query_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// StatsQueryResponse generates response for /select/logsql/stats_query + +//line app/vlselect/logsql/stats_query_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/stats_query_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/stats_query_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/stats_query_response.qtpl:4 +func StreamStatsQueryResponse(qw422016 *qt422016.Writer, rows []statsRow) { +//line app/vlselect/logsql/stats_query_response.qtpl:4 + qw422016.N().S(`{"status":"success","data":{"resultType":"vector","result":[`) +//line app/vlselect/logsql/stats_query_response.qtpl:10 + if len(rows) > 0 { +//line app/vlselect/logsql/stats_query_response.qtpl:11 + streamformatStatsRow(qw422016, &rows[0]) +//line app/vlselect/logsql/stats_query_response.qtpl:12 + rows = rows[1:] + +//line app/vlselect/logsql/stats_query_response.qtpl:13 + for i := range rows { +//line app/vlselect/logsql/stats_query_response.qtpl:13 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_response.qtpl:14 + streamformatStatsRow(qw422016, &rows[i]) +//line app/vlselect/logsql/stats_query_response.qtpl:15 + } +//line app/vlselect/logsql/stats_query_response.qtpl:16 + } +//line app/vlselect/logsql/stats_query_response.qtpl:16 + qw422016.N().S(`]}}`) +//line app/vlselect/logsql/stats_query_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_response.qtpl:20 +func WriteStatsQueryResponse(qq422016 qtio422016.Writer, rows []statsRow) { +//line app/vlselect/logsql/stats_query_response.qtpl:20 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stats_query_response.qtpl:20 + StreamStatsQueryResponse(qw422016, rows) +//line app/vlselect/logsql/stats_query_response.qtpl:20 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stats_query_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_response.qtpl:20 +func StatsQueryResponse(rows []statsRow) string { +//line app/vlselect/logsql/stats_query_response.qtpl:20 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stats_query_response.qtpl:20 + WriteStatsQueryResponse(qb422016, rows) +//line app/vlselect/logsql/stats_query_response.qtpl:20 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stats_query_response.qtpl:20 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stats_query_response.qtpl:20 + return qs422016 +//line app/vlselect/logsql/stats_query_response.qtpl:20 +} + +//line app/vlselect/logsql/stats_query_response.qtpl:22 +func streamformatStatsRow(qw422016 *qt422016.Writer, r *statsRow) { +//line app/vlselect/logsql/stats_query_response.qtpl:22 + qw422016.N().S(`{"metric":{"__name__":`) +//line app/vlselect/logsql/stats_query_response.qtpl:25 + qw422016.N().Q(r.Name) +//line app/vlselect/logsql/stats_query_response.qtpl:26 + if len(r.Labels) > 0 { +//line app/vlselect/logsql/stats_query_response.qtpl:27 + for _, label := range r.Labels { +//line app/vlselect/logsql/stats_query_response.qtpl:27 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_response.qtpl:28 + qw422016.N().Q(label.Name) +//line app/vlselect/logsql/stats_query_response.qtpl:28 + qw422016.N().S(`:`) +//line app/vlselect/logsql/stats_query_response.qtpl:28 + qw422016.N().Q(label.Value) +//line app/vlselect/logsql/stats_query_response.qtpl:29 + } +//line app/vlselect/logsql/stats_query_response.qtpl:30 + } +//line app/vlselect/logsql/stats_query_response.qtpl:30 + qw422016.N().S(`},"value":[`) +//line app/vlselect/logsql/stats_query_response.qtpl:32 + qw422016.N().F(float64(r.Timestamp) / 1e9) +//line app/vlselect/logsql/stats_query_response.qtpl:32 + qw422016.N().S(`,`) +//line app/vlselect/logsql/stats_query_response.qtpl:32 + qw422016.N().Q(r.Value) +//line app/vlselect/logsql/stats_query_response.qtpl:32 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/stats_query_response.qtpl:34 +} + +//line app/vlselect/logsql/stats_query_response.qtpl:34 +func writeformatStatsRow(qq422016 qtio422016.Writer, r *statsRow) { +//line app/vlselect/logsql/stats_query_response.qtpl:34 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/stats_query_response.qtpl:34 + streamformatStatsRow(qw422016, r) +//line app/vlselect/logsql/stats_query_response.qtpl:34 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/stats_query_response.qtpl:34 +} + +//line app/vlselect/logsql/stats_query_response.qtpl:34 +func formatStatsRow(r *statsRow) string { +//line app/vlselect/logsql/stats_query_response.qtpl:34 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/stats_query_response.qtpl:34 + writeformatStatsRow(qb422016, r) +//line app/vlselect/logsql/stats_query_response.qtpl:34 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/stats_query_response.qtpl:34 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/stats_query_response.qtpl:34 + return qs422016 +//line app/vlselect/logsql/stats_query_response.qtpl:34 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index c91480682..9033f4040 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -193,6 +193,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re logsqlQueryRequests.Inc() logsql.ProcessQueryRequest(ctx, w, r) return true + case "/select/logsql/stats_query": + logsqlStatsQueryRequests.Inc() + logsql.ProcessStatsQueryRequest(ctx, w, r) + return true case "/select/logsql/stream_field_names": logsqlStreamFieldNamesRequests.Inc() logsql.ProcessStreamFieldNamesRequest(ctx, w, r) @@ -232,6 +236,7 @@ var ( logsqlFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/field_values"}`) logsqlHitsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/hits"}`) logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) + logsqlStatsQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stats_query"}`) logsqlStreamFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_names"}`) logsqlStreamFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_values"}`) logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`) diff --git a/app/vmselect/main.go b/app/vmselect/main.go index dbe57eb8e..83fcbdda9 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -2,7 +2,6 @@ package vmselect import ( "embed" - "errors" "flag" "fmt" "net/http" @@ -187,7 +186,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.LabelValuesHandler(qt, startTime, labelName, w, r); err != nil { labelValuesErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -210,7 +209,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.QueryHandler(qt, startTime, w, r); err != nil { queryErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -219,7 +218,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.QueryRangeHandler(qt, startTime, w, r); err != nil { queryRangeErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -228,7 +227,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.SeriesHandler(qt, startTime, w, r); err != nil { seriesErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -237,7 +236,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.SeriesCountHandler(startTime, w, r); err != nil { seriesCountErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -246,7 +245,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.LabelsHandler(qt, startTime, w, r); err != nil { labelsErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -255,7 +254,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { httpserver.EnableCORS(w, r) if err := prometheus.TSDBStatusHandler(qt, startTime, w, r); err != nil { statusTSDBErrors.Inc() - sendPrometheusError(w, r, err) + httpserver.SendPrometheusError(w, r, err) return true } return true @@ -498,7 +497,7 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path httpserver.EnableCORS(w, r) if err := prometheus.QueryStatsHandler(w, r); err != nil { topQueriesErrors.Inc() - sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + httpserver.SendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) return true } return true @@ -575,24 +574,6 @@ func isGraphiteTagsPath(path string) bool { } } -func sendPrometheusError(w http.ResponseWriter, r *http.Request, err error) { - logger.WarnfSkipframes(1, "error in %q: %s", httpserver.GetRequestURI(r), err) - - w.Header().Set("Content-Type", "application/json") - statusCode := http.StatusUnprocessableEntity - var esc *httpserver.ErrorWithStatusCode - if errors.As(err, &esc) { - statusCode = esc.StatusCode - } - w.WriteHeader(statusCode) - - var ure *promql.UserReadableError - if errors.As(err, &ure) { - err = ure - } - prometheus.WriteErrorResponse(w, statusCode, err) -} - var ( requestDuration = metrics.NewHistogram(`vmselect_request_duration_seconds`) diff --git a/app/vmselect/prometheus/error_response.qtpl.go b/app/vmselect/prometheus/error_response.qtpl.go deleted file mode 100644 index c6ecb80c6..000000000 --- a/app/vmselect/prometheus/error_response.qtpl.go +++ /dev/null @@ -1,61 +0,0 @@ -// Code generated by qtc from "error_response.qtpl". DO NOT EDIT. -// See https://github.com/valyala/quicktemplate for details. - -// ErrorResponse generates error response for /api/v1/query.See https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview - -//line app/vmselect/prometheus/error_response.qtpl:4 -package prometheus - -//line app/vmselect/prometheus/error_response.qtpl:4 -import ( - qtio422016 "io" - - qt422016 "github.com/valyala/quicktemplate" -) - -//line app/vmselect/prometheus/error_response.qtpl:4 -var ( - _ = qtio422016.Copy - _ = qt422016.AcquireByteBuffer -) - -//line app/vmselect/prometheus/error_response.qtpl:4 -func StreamErrorResponse(qw422016 *qt422016.Writer, statusCode int, err error) { -//line app/vmselect/prometheus/error_response.qtpl:4 - qw422016.N().S(`{"status":"error","errorType":"`) -//line app/vmselect/prometheus/error_response.qtpl:7 - qw422016.N().D(statusCode) -//line app/vmselect/prometheus/error_response.qtpl:7 - qw422016.N().S(`","error":`) -//line app/vmselect/prometheus/error_response.qtpl:8 - qw422016.N().Q(err.Error()) -//line app/vmselect/prometheus/error_response.qtpl:8 - qw422016.N().S(`}`) -//line app/vmselect/prometheus/error_response.qtpl:10 -} - -//line app/vmselect/prometheus/error_response.qtpl:10 -func WriteErrorResponse(qq422016 qtio422016.Writer, statusCode int, err error) { -//line app/vmselect/prometheus/error_response.qtpl:10 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/error_response.qtpl:10 - StreamErrorResponse(qw422016, statusCode, err) -//line app/vmselect/prometheus/error_response.qtpl:10 - qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/error_response.qtpl:10 -} - -//line app/vmselect/prometheus/error_response.qtpl:10 -func ErrorResponse(statusCode int, err error) string { -//line app/vmselect/prometheus/error_response.qtpl:10 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/error_response.qtpl:10 - WriteErrorResponse(qb422016, statusCode, err) -//line app/vmselect/prometheus/error_response.qtpl:10 - qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/error_response.qtpl:10 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/error_response.qtpl:10 - return qs422016 -//line app/vmselect/prometheus/error_response.qtpl:10 -} diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 5ecf21920..0faac1f5f 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -19,6 +19,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer" @@ -354,7 +355,7 @@ func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) { tf := getTransformFunc(fe.Name) if tf == nil { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: fmt.Errorf(`unknown func %q`, fe.Name), } } @@ -376,7 +377,7 @@ func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.Fun } rv, err := tf(tfa) if err != nil { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err), } } @@ -407,7 +408,7 @@ func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFunc } af := getAggrFunc(ae.Name) if af == nil { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: fmt.Errorf(`unknown func %q`, ae.Name), } } @@ -802,12 +803,12 @@ func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf } tssAt, err := evalExpr(qt, ec, re.At) if err != nil { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err), } } if len(tssAt) != 1 { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)), } } @@ -869,7 +870,7 @@ func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName st rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re) } if err != nil { - return nil, &UserReadableError{ + return nil, &httpserver.UserReadableError{ Err: err, } } @@ -1601,7 +1602,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa if ec.Start == ec.End { rvs, err := evalInstantRollup(qt, ec, funcName, rf, expr, me, iafc, window) if err != nil { - err = &UserReadableError{ + err = &httpserver.UserReadableError{ Err: err, } return nil, err @@ -1612,7 +1613,7 @@ func evalRollupFuncWithMetricExpr(qt *querytracer.Tracer, ec *EvalConfig, funcNa evalWithConfig := func(ec *EvalConfig) ([]*timeseries, error) { tss, err := evalRollupFuncNoCache(qt, ec, funcName, rf, expr, me, iafc, window, pointsPerSeries) if err != nil { - err = &UserReadableError{ + err = &httpserver.UserReadableError{ Err: err, } return nil, err diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 5f0d208e8..5f2a798e0 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -35,24 +35,6 @@ var ( "Such conversion can be disabled using -search.disableImplicitConversion.") ) -// UserReadableError is a type of error which supposed to be returned to the user without additional context. -type UserReadableError struct { - // Err is the error which needs to be returned to the user. - Err error -} - -// Unwrap returns ure.Err. -// -// This is used by standard errors package. See https://golang.org/pkg/errors -func (ure *UserReadableError) Unwrap() error { - return ure.Err -} - -// Error satisfies Error interface -func (ure *UserReadableError) Error() string { - return ure.Err.Error() -} - // Exec executes q for the given ec. func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, error) { if querystats.Enabled() { diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 2af3859db..2ae28e32c 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add [`/select/logsql/stats_query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-log-stats), which is going to be used by [vmalert](https://docs.victoriametrics.com/vmalert/) for executing alerting and recording rules against VictoriaLogs. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6942) for details. * FEATURE: optimize [multi-exact queries](https://docs.victoriametrics.com/victorialogs/logsql/#multi-exact-filter) with many phrases to search. For example, `ip:in(path:="/foo/bar" | keep ip)` when there are many unique values for `ip` field among log entries with `/foo/bar` path. * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add support for displaying the top 5 log streams in the hits graph. The remaining log streams are grouped into an "other" label. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6545). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add the ability to customize the graph display with options for bar, line, stepped line, and points. diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 3752ad9be..a10f5768f 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -13,6 +13,7 @@ VictoriaLogs provides the following HTTP endpoints: - [`/select/logsql/query`](#querying-logs) for querying logs. - [`/select/logsql/tail`](#live-tailing) for live tailing of query results. - [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range. +- [`/select/logsql/stats_query`](#querying-log-stats) for querying log stats at the given time. - [`/select/logsql/stream_ids`](#querying-stream_ids) for querying `_stream_id` values of [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). - [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). - [`/select/logsql/stream_field_names`](#querying-stream-field-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names. @@ -105,6 +106,7 @@ See also: - [Live tailing](#live-tailing) - [Querying hits stats](#querying-hits-stats) +- [Querying log stats](#querying-log-stats) - [Querying streams](#querying-streams) - [Querying stream field names](#querying-stream-field-names) - [Querying stream field values](#querying-stream-field-values) @@ -273,9 +275,80 @@ curl http://localhost:9428/select/logsql/hits -H 'AccountID: 12' -H 'ProjectID: See also: - [Querying logs](#querying-logs) +- [Querying log stats](#querying-log-stats) - [Querying streams](#querying-streams) - [HTTP API](#http-api) +### Querying log stats + +VictoriaLogs provides `/select/logsql/stats_query?query=&time=` HTTP endpoint, which returns log stats +for the given [`query`](https://docs.victoriametrics.com/victorialogs/logsql/) at the given timestamp `t` +in the format compatible with [Prometheus querying API](https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries). + +The `` arg can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats). +If `` is missing, then it equals to the current time. + +The `` must contain [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). The calculated stats is converted into metrics +with labels enumerated in `by(...)` clause of the `| stats by(...)` pipe. + +For example, the following command returns the number of logs per each `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +across logs over `2024-01-01` day by UTC: + +```sh +curl http://localhost:9428/select/logsql/stats_query -d 'query=_time:1d | stats by (level) count(*)' -d 'time=2024-01-02' +``` + +Below is an example JSON output returned from this endpoint: + +```json +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": { + "__name__": "count(*)", + "level": "info" + }, + "value": [ + 1704153600, + "20395342" + ] + }, + { + "metric": { + "__name__": "count(*)", + "level": "warn" + }, + "value": [ + 1704153600, + "1239222" + ] + }, + { + "metric": { + "__name__": "count(*)", + "level": "error" + }, + "value": [ + 1704153600, + "832" + ] + }, + ] + } +} +``` + +The `/select/logsql/stats_query` API is useful for generating Prometheus-compatible alerts and calculating recording rules results. + +See also: + +- [Querying logs](#querying-logs) +- [Querying hits stats](#querying-hits-stats) +- [HTTP API](#http-api) + ### Querying stream_ids VictoriaLogs provides `/select/logsql/stream_ids?query=&start=&end=` HTTP endpoint, which returns `_stream_id` values diff --git a/lib/httpserver/prometheus.go b/lib/httpserver/prometheus.go new file mode 100644 index 000000000..08c5e32af --- /dev/null +++ b/lib/httpserver/prometheus.go @@ -0,0 +1,47 @@ +package httpserver + +import ( + "errors" + "net/http" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +// SendPrometheusError sends err to w in Prometheus querying API response format. +// +// See https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview for more details +func SendPrometheusError(w http.ResponseWriter, r *http.Request, err error) { + logger.WarnfSkipframes(1, "error in %q: %s", GetRequestURI(r), err) + + w.Header().Set("Content-Type", "application/json") + statusCode := http.StatusUnprocessableEntity + var esc *ErrorWithStatusCode + if errors.As(err, &esc) { + statusCode = esc.StatusCode + } + w.WriteHeader(statusCode) + + var ure *UserReadableError + if errors.As(err, &ure) { + err = ure + } + WritePrometheusErrorResponse(w, statusCode, err) +} + +// UserReadableError is a type of error which supposed to be returned to the user without additional context. +type UserReadableError struct { + // Err is the error which needs to be returned to the user. + Err error +} + +// Unwrap returns ure.Err. +// +// This is used by standard errors package. See https://golang.org/pkg/errors +func (ure *UserReadableError) Unwrap() error { + return ure.Err +} + +// Error satisfies Error interface +func (ure *UserReadableError) Error() string { + return ure.Err.Error() +} diff --git a/app/vmselect/prometheus/error_response.qtpl b/lib/httpserver/prometheus_error_response.qtpl similarity index 60% rename from app/vmselect/prometheus/error_response.qtpl rename to lib/httpserver/prometheus_error_response.qtpl index c56facf9d..241c7c6ab 100644 --- a/app/vmselect/prometheus/error_response.qtpl +++ b/lib/httpserver/prometheus_error_response.qtpl @@ -1,7 +1,7 @@ {% stripspace %} -ErrorResponse generates error response for /api/v1/query. +PrometheusErrorResponse generates error response for Prometheus Querying API. See https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview -{% func ErrorResponse(statusCode int, err error) %} +{% func PrometheusErrorResponse(statusCode int, err error) %} { "status":"error", "errorType":"{%d statusCode %}", diff --git a/lib/httpserver/prometheus_error_response.qtpl.go b/lib/httpserver/prometheus_error_response.qtpl.go new file mode 100644 index 000000000..3756d7747 --- /dev/null +++ b/lib/httpserver/prometheus_error_response.qtpl.go @@ -0,0 +1,61 @@ +// Code generated by qtc from "prometheus_error_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// PrometheusErrorResponse generates error response for Prometheus Querying API.See https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview + +//line lib/httpserver/prometheus_error_response.qtpl:4 +package httpserver + +//line lib/httpserver/prometheus_error_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line lib/httpserver/prometheus_error_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line lib/httpserver/prometheus_error_response.qtpl:4 +func StreamPrometheusErrorResponse(qw422016 *qt422016.Writer, statusCode int, err error) { +//line lib/httpserver/prometheus_error_response.qtpl:4 + qw422016.N().S(`{"status":"error","errorType":"`) +//line lib/httpserver/prometheus_error_response.qtpl:7 + qw422016.N().D(statusCode) +//line lib/httpserver/prometheus_error_response.qtpl:7 + qw422016.N().S(`","error":`) +//line lib/httpserver/prometheus_error_response.qtpl:8 + qw422016.N().Q(err.Error()) +//line lib/httpserver/prometheus_error_response.qtpl:8 + qw422016.N().S(`}`) +//line lib/httpserver/prometheus_error_response.qtpl:10 +} + +//line lib/httpserver/prometheus_error_response.qtpl:10 +func WritePrometheusErrorResponse(qq422016 qtio422016.Writer, statusCode int, err error) { +//line lib/httpserver/prometheus_error_response.qtpl:10 + qw422016 := qt422016.AcquireWriter(qq422016) +//line lib/httpserver/prometheus_error_response.qtpl:10 + StreamPrometheusErrorResponse(qw422016, statusCode, err) +//line lib/httpserver/prometheus_error_response.qtpl:10 + qt422016.ReleaseWriter(qw422016) +//line lib/httpserver/prometheus_error_response.qtpl:10 +} + +//line lib/httpserver/prometheus_error_response.qtpl:10 +func PrometheusErrorResponse(statusCode int, err error) string { +//line lib/httpserver/prometheus_error_response.qtpl:10 + qb422016 := qt422016.AcquireByteBuffer() +//line lib/httpserver/prometheus_error_response.qtpl:10 + WritePrometheusErrorResponse(qb422016, statusCode, err) +//line lib/httpserver/prometheus_error_response.qtpl:10 + qs422016 := string(qb422016.B) +//line lib/httpserver/prometheus_error_response.qtpl:10 + qt422016.ReleaseByteBuffer(qb422016) +//line lib/httpserver/prometheus_error_response.qtpl:10 + return qs422016 +//line lib/httpserver/prometheus_error_response.qtpl:10 +} diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 131e5eef6..a95a34e1f 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -3,6 +3,7 @@ package logstorage import ( "fmt" "math" + "slices" "strconv" "strings" "time" @@ -57,10 +58,15 @@ func (lex *lexer) restoreState(ls *lexerState) { // // The lex.token points to the first token in s. func newLexer(s string) *lexer { + timestamp := time.Now().UnixNano() + return newLexerAtTimestamp(s, timestamp) +} + +func newLexerAtTimestamp(s string, timestamp int64) *lexer { lex := &lexer{ s: s, sOrig: s, - currentTimestamp: time.Now().UnixNano(), + currentTimestamp: timestamp, } lex.nextToken() return lex @@ -221,6 +227,9 @@ type Query struct { f filter pipes []pipe + + // timestamp is the timestamp context used for parsing the query. + timestamp int64 } // String returns string representation for q. @@ -445,6 +454,77 @@ func (q *Query) Optimize() { } } +// GetStatsByFields returns `| stats by (...)` fields from q if q contains safe `| stats ...` pipe in the end. +// +// False is returned if q doesn't contain safe `| stats ...` pipe. +func (q *Query) GetStatsByFields() ([]string, bool) { + pipes := q.pipes + + idx := getLastPipeStatsIdx(pipes) + if idx < 0 { + return nil, false + } + + // extract by(...) field names from stats pipe + byFields := pipes[idx].(*pipeStats).byFields + fields := make([]string, len(byFields)) + for i, f := range byFields { + fields[i] = f.name + } + + // verify that all the pipes after the idx do not add new fields + for i := idx + 1; i < len(pipes); i++ { + p := pipes[i] + switch t := p.(type) { + case *pipeSort, *pipeOffset, *pipeLimit, *pipeFilter: + // These pipes do not change the set of fields. + case *pipeMath: + // Allow pipeMath, since it adds additional metrics to the given set of fields. + case *pipeFields: + // `| fields ...` pipe must contain all the by(...) fields, otherwise it breaks output. + for _, f := range fields { + if !slices.Contains(t.fields, f) { + return nil, false + } + } + case *pipeDelete: + // Disallow deleting by(...) fields, since this breaks output. + for _, f := range t.fields { + if slices.Contains(fields, f) { + return nil, false + } + } + case *pipeCopy: + // Disallow copying by(...) fields, since this breaks output. + for _, f := range t.srcFields { + if slices.Contains(fields, f) { + return nil, false + } + } + case *pipeRename: + // Update by(...) fields with dst fields + for i, f := range t.srcFields { + if n := slices.Index(fields, f); n >= 0 { + fields[n] = t.dstFields[i] + } + } + default: + return nil, false + } + } + + return fields, true +} + +func getLastPipeStatsIdx(pipes []pipe) int { + for i := len(pipes) - 1; i >= 0; i-- { + if _, ok := pipes[i].(*pipeStats); ok { + return i + } + } + return -1 +} + func removeStarFilters(f filter) filter { visitFunc := func(f filter) bool { fp, ok := f.(*filterPrefix) @@ -584,7 +664,15 @@ func (q *Query) getNeededColumns() ([]string, []string) { // ParseQuery parses s. func ParseQuery(s string) (*Query, error) { - lex := newLexer(s) + timestamp := time.Now().UnixNano() + return ParseQueryAtTimestamp(s, timestamp) +} + +// ParseQueryAtTimestamp parses s in the context of the given timestamp. +// +// E.g. _time:duration filters are ajusted according to the provided timestamp as _time:[timestamp-duration, duration]. +func ParseQueryAtTimestamp(s string, timestamp int64) (*Query, error) { + lex := newLexerAtTimestamp(s, timestamp) // Verify the first token doesn't match pipe names. firstToken := strings.ToLower(lex.rawToken) @@ -600,9 +688,15 @@ func ParseQuery(s string) (*Query, error) { if !lex.isEnd() { return nil, fmt.Errorf("unexpected unparsed tail after [%s]; context: [%s]; tail: [%s]", q, lex.context(), lex.s) } + q.timestamp = timestamp return q, nil } +// GetTimestamp returns timestamp context for the given q, which was passed to ParseQueryAtTimestamp(). +func (q *Query) GetTimestamp() int64 { + return q.timestamp +} + func parseQuery(lex *lexer) (*Query, error) { f, err := parseFilter(lex) if err != nil { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 6956ba6e7..4cbf18cef 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -2100,3 +2100,94 @@ func TestQueryDropAllPipes(t *testing.T) { f(`foo or bar and baz | top 5 by (x)`, `foo or bar baz`) f(`foo | filter bar:baz | stats by (x) min(y)`, `foo bar:baz`) } + +func TestQueryGetStatsByFields_Success(t *testing.T) { + f := func(qStr string, fieldsExpected []string) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + fields, ok := q.GetStatsByFields() + if !ok { + t.Fatalf("cannot obtain byFields from the query [%s]", qStr) + } + if !reflect.DeepEqual(fields, fieldsExpected) { + t.Fatalf("unexpected byFields;\ngot\n%q\nwant\n%q", fields, fieldsExpected) + } + } + + f(`* | stats count()`, []string{}) + f(`* | count()`, []string{}) + f(`* | by (foo) count(), count_uniq(bar)`, []string{"foo"}) + f(`* | stats by (a, b, cd) min(foo), max(bar)`, []string{"a", "b", "cd"}) + + // multiple pipes before stats is ok + f(`foo | extract "ip=," | stats by (host) count_uniq(ip)`, []string{"host"}) + + // sort, offset and limit pipes are allowed after stats + f(`foo | stats by (x, y) count() rows | sort by (rows) desc | offset 5 | limit 10`, []string{"x", "y"}) + + // filter pipe is allowed after stats + f(`foo | stats by (x, y) count() rows | filter rows:>100`, []string{"x", "y"}) + + // math pipe is allowed after stats + f(`foo | stats by (x) count() total, count() if (error) errors | math errors / total`, []string{"x"}) + + // keep containing all the by(...) fields + f(`foo | stats by (x) count() total | keep x, y`, []string{"x"}) + + // drop which doesn't contain by(...) fields + f(`foo | stats by (x) count() total | drop y`, []string{"x"}) + + // copy which doesn't contain by(...) fields + f(`foo | stats by (x) count() total | copy total abc`, []string{"x"}) + + // mv by(...) fields + f(`foo | stats by (x) count() total | mv x y`, []string{"y"}) +} + +func TestQueryGetStatsByFields_Failure(t *testing.T) { + f := func(qStr string) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + fields, ok := q.GetStatsByFields() + if ok { + t.Fatalf("expecting failure to get byFields for the query [%s]", qStr) + } + if fields != nil { + t.Fatalf("expectig nil fields; got %q", fields) + } + } + + f(`*`) + f(`foo bar`) + f(`foo | by (a, b) count() | copy a b`) + f(`foo | by (a, b) count() | delete a`) + f(`foo | count() | drop_empty_fields`) + f(`foo | count() | extract "foobaz"`) + f(`foo | count() | extract_regexp "(?P([0-9]+[.]){3}[0-9]+)"`) + f(`foo | count() | field_names`) + f(`foo | count() | field_values abc`) + f(`foo | by (x) count() | fields a, b`) + f(`foo | count() | format "foobaz"`) + f(`foo | count() | pack_json`) + f(`foo | count() | pack_logfmt`) + f(`foo | rename x y`) + f(`foo | count() | replace ("foo", "bar")`) + f(`foo | count() | replace_regexp ("foo.+bar", "baz")`) + f(`foo | count() | stream_context after 10`) + f(`foo | count() | top 5 by (x)`) + f(`foo | count() | uniq by (x)`) + f(`foo | count() | unpack_json`) + f(`foo | count() | unpack_logfmt`) + f(`foo | count() | unpack_syslog`) + f(`foo | count() | unroll by (x)`) + + f(`* | by (x) count() as rows | math rows * 10, rows / 10 | drop x`) +} diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index 7382bb88c..c26b5a8f8 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -728,7 +728,7 @@ var zeroByStatsField = &byStatsField{} // byStatsField represents 'by (...)' part of the pipeStats. // -// It can have either 'name' representation or 'name:bucket' or 'name:buket offset off' representation, +// It can have either 'name' representation or 'name:bucket' or 'name:bucket offset off' representation, // where `bucket` and `off` can contain duration, size or numeric value for creating different buckets // for 'value/bucket'. type byStatsField struct {