diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 7520265e7..de8a0a6a0 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -520,7 +520,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW return true case "prometheus/api/v1/export": exportRequests.Inc() - if err := prometheus.ExportHandler(startTime, at, w, r); err != nil { + if err := prometheus.ExportHandler(qt, startTime, at, w, r); err != nil { exportErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -528,7 +528,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW return true case "prometheus/api/v1/export/native": exportNativeRequests.Inc() - if err := prometheus.ExportNativeHandler(startTime, at, w, r); err != nil { + if err := prometheus.ExportNativeHandler(qt, startTime, at, w, r); err != nil { exportNativeErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true @@ -536,7 +536,7 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW return true case "prometheus/api/v1/export/csv": exportCSVRequests.Inc() - if err := prometheus.ExportCSVHandler(startTime, at, w, r); err != nil { + if err := prometheus.ExportCSVHandler(qt, startTime, at, w, r); err != nil { exportCSVErrors.Inc() httpserver.Errorf(w, r, "%s", err) return true diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 81776ba9d..8c6e8ad38 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -3,7 +3,6 @@ package prometheus import ( "flag" "fmt" - "github.com/VictoriaMetrics/metricsql" "math" "net" "net/http" @@ -14,6 +13,8 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/metricsql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" @@ -146,13 +147,14 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) // ExportCSVHandler exports data in CSV format from /api/v1/export/csv -func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { +func ExportCSVHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { defer exportCSVDuration.UpdateDuration(startTime) cp, err := getExportParams(r, startTime) if err != nil { return err } + qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id")) format := r.FormValue("format") if len(format) == 0 { @@ -179,7 +181,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter // Unconditionally deny partial response for the exported data, // since users usually expect that the exported data is full. denyPartialResponse := true - rss, _, err := netstorage.ProcessSearchQuery(nil, denyPartialResponse, sq, cp.deadline) + rss, _, err := netstorage.ProcessSearchQuery(qt, denyPartialResponse, sq, cp.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -203,7 +205,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter }() } else { go func() { - err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { + err := netstorage.ExportBlocks(qt, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -227,19 +229,25 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter if err != nil { return fmt.Errorf("error during sending the exported csv data to remote client: %w", err) } + if qt.Enabled() { + qt.Done() + fmt.Println(qt.ToJSON()) + } + return sw.flush() } var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`) // ExportNativeHandler exports data in native format from /api/v1/export/native. -func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { +func ExportNativeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { defer exportNativeDuration.UpdateDuration(startTime) cp, err := getExportParams(r, startTime) if err != nil { return err } + qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id")) sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) w.Header().Set("Content-Type", "VictoriaMetrics/native") @@ -254,7 +262,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri _, _ = bw.Write(trBuf) // Marshal native blocks. - err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { + err = netstorage.ExportBlocks(qt, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -282,6 +290,11 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri if err != nil { return fmt.Errorf("error during sending native data to remote client: %w", err) } + if qt.Enabled() { + qt.Done() + fmt.Println(qt.ToJSON()) + } + return sw.flush() } @@ -290,19 +303,25 @@ var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path= var bbPool bytesutil.ByteBufferPool // ExportHandler exports data in raw format from /api/v1/export. -func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { +func ExportHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error { defer exportDuration.UpdateDuration(startTime) cp, err := getExportParams(r, startTime) if err != nil { return err } + qt.Printf("ts=%d, trace_id=%q", time.Now().UnixMilli(), r.FormValue("trace_id")) + format := r.FormValue("format") maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) reduceMemUsage := httputils.GetBool(r, "reduce_mem_usage") - if err := exportHandler(nil, at, w, cp, format, maxRowsPerLine, reduceMemUsage); err != nil { + if err := exportHandler(qt, at, w, cp, format, maxRowsPerLine, reduceMemUsage); err != nil { return fmt.Errorf("error when exporting data on the time range (start=%d, end=%d): %w", cp.start, cp.end, err) } + if qt.Enabled() { + qt.Done() + fmt.Println(qt.ToJSON()) + } return nil } @@ -958,7 +977,8 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok } func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, query string, - start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error { + start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter, +) error { deadline := searchutils.GetDeadlineForQuery(r, startTime) mayCache := !httputils.GetBool(r, "nocache") lookbackDelta, err := getMaxLookback(r)