app/vmselect: adds tracing for export APIS into stdout

it must help to debug possible issues with it.
part of https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5622
This commit is contained in:
f41gh7 2024-01-16 11:44:01 +01:00
parent 822ab189f7
commit 44e485332e
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
2 changed files with 32 additions and 12 deletions

View file

@ -520,7 +520,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return true
case "prometheus/api/v1/export":
exportRequests.Inc()
if err := prometheus.ExportHandler(startTime, at, w, r); err != nil {
if err := prometheus.ExportHandler(qt, startTime, at, w, r); err != nil {
exportErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -528,7 +528,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return true
case "prometheus/api/v1/export/native":
exportNativeRequests.Inc()
if err := prometheus.ExportNativeHandler(startTime, at, w, r); err != nil {
if err := prometheus.ExportNativeHandler(qt, startTime, at, w, r); err != nil {
exportNativeErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true
@ -536,7 +536,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return true
case "prometheus/api/v1/export/csv":
exportCSVRequests.Inc()
if err := prometheus.ExportCSVHandler(startTime, at, w, r); err != nil {
if err := prometheus.ExportCSVHandler(qt, startTime, at, w, r); err != nil {
exportCSVErrors.Inc()
httpserver.Errorf(w, r, "%s", err)
return true

View file

@ -3,7 +3,6 @@ package prometheus
import (
"flag"
"fmt"
"github.com/VictoriaMetrics/metricsql"
"math"
"net"
"net/http"
@ -14,6 +13,8 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metricsql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
@ -146,13 +147,14 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
// ExportCSVHandler exports data in CSV format from /api/v1/export/csv
func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
func ExportCSVHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
defer exportCSVDuration.UpdateDuration(startTime)
cp, err := getExportParams(r, startTime)
if err != nil {
return err
}
qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id"))
format := r.FormValue("format")
if len(format) == 0 {
@ -179,7 +181,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
// Unconditionally deny partial response for the exported data,
// since users usually expect that the exported data is full.
denyPartialResponse := true
rss, _, err := netstorage.ProcessSearchQuery(nil, denyPartialResponse, sq, cp.deadline)
rss, _, err := netstorage.ProcessSearchQuery(qt, denyPartialResponse, sq, cp.deadline)
if err != nil {
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
}
@ -203,7 +205,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
}()
} else {
go func() {
err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
err := netstorage.ExportBlocks(qt, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
if err := bw.Error(); err != nil {
return err
}
@ -227,19 +229,25 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
if err != nil {
return fmt.Errorf("error during sending the exported csv data to remote client: %w", err)
}
if qt.Enabled() {
qt.Done()
fmt.Println(qt.ToJSON())
}
return sw.flush()
}
var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`)
// ExportNativeHandler exports data in native format from /api/v1/export/native.
func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
func ExportNativeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
defer exportNativeDuration.UpdateDuration(startTime)
cp, err := getExportParams(r, startTime)
if err != nil {
return err
}
qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id"))
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries)
w.Header().Set("Content-Type", "VictoriaMetrics/native")
@ -254,7 +262,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
_, _ = bw.Write(trBuf)
// Marshal native blocks.
err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
err = netstorage.ExportBlocks(qt, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
if err := bw.Error(); err != nil {
return err
}
@ -282,6 +290,11 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
if err != nil {
return fmt.Errorf("error during sending native data to remote client: %w", err)
}
if qt.Enabled() {
qt.Done()
fmt.Println(qt.ToJSON())
}
return sw.flush()
}
@ -290,19 +303,25 @@ var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path=
var bbPool bytesutil.ByteBufferPool
// ExportHandler exports data in raw format from /api/v1/export.
func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
func ExportHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
defer exportDuration.UpdateDuration(startTime)
cp, err := getExportParams(r, startTime)
if err != nil {
return err
}
qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id"))
format := r.FormValue("format")
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
reduceMemUsage := httputils.GetBool(r, "reduce_mem_usage")
if err := exportHandler(nil, at, w, cp, format, maxRowsPerLine, reduceMemUsage); err != nil {
if err := exportHandler(qt, at, w, cp, format, maxRowsPerLine, reduceMemUsage); err != nil {
return fmt.Errorf("error when exporting data on the time range (start=%d, end=%d): %w", cp.start, cp.end, err)
}
if qt.Enabled() {
qt.Done()
fmt.Println(qt.ToJSON())
}
return nil
}
@ -958,7 +977,8 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
}
func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, query string,
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter,
) error {
deadline := searchutils.GetDeadlineForQuery(r, startTime)
mayCache := !httputils.GetBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r)