diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 9376fb1c0..5511df94c 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -118,7 +118,6 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { defer exportCSVDuration.UpdateDuration(startTime) - ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } @@ -127,21 +126,13 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques return fmt.Errorf("missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data") } fieldNames := strings.Split(format, ",") - start, err := searchutils.GetTime(r, "start", 0) - if err != nil { - return err - } - end, err := searchutils.GetTime(r, "end", ct) - if err != nil { - return err - } reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage") - deadline := searchutils.GetDeadlineForExport(r, startTime) - tagFilterss, err := getTagFilterssFromRequest(r) + ep, err := getExportParams(r, startTime) if err != nil { return err } - sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries) + + sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries) w.Header().Set("Content-Type", "text/csv; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -157,7 +148,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques } doneCh := make(chan error, 1) if !reduceMemUsage { - rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) + rss, err := netstorage.ProcessSearchQuery(sq, true, ep.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -180,7 +171,7 @@ func ExportCSVHandler(startTime time.Time, w http.ResponseWriter, r *http.Reques }() } else { go func() { - err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { if err := bw.Error(); err != nil { return err } @@ -221,36 +212,27 @@ var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/a func ExportNativeHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { defer exportNativeDuration.UpdateDuration(startTime) - ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } - start, err := searchutils.GetTime(r, "start", 0) + ep, err := getExportParams(r, startTime) if err != nil { return err } - end, err := searchutils.GetTime(r, "end", ct) - if err != nil { - return err - } - deadline := searchutils.GetDeadlineForExport(r, startTime) - tagFilterss, err := getTagFilterssFromRequest(r) - if err != nil { - return err - } - sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries) + + sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries) w.Header().Set("Content-Type", "VictoriaMetrics/native") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) // Marshal tr trBuf := make([]byte, 0, 16) - trBuf = encoding.MarshalInt64(trBuf, start) - trBuf = encoding.MarshalInt64(trBuf, end) + trBuf = encoding.MarshalInt64(trBuf, ep.start) + trBuf = encoding.MarshalInt64(trBuf, ep.end) _, _ = bw.Write(trBuf) // Marshal native blocks. - err = netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err = netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { if err := bw.Error(); err != nil { return err } @@ -295,42 +277,25 @@ var bbPool bytesutil.ByteBufferPool func ExportHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { defer exportDuration.UpdateDuration(startTime) - ct := startTime.UnixNano() / 1e6 if err := r.ParseForm(); err != nil { return fmt.Errorf("cannot parse request form values: %w", err) } - matches := getMatchesFromRequest(r) - if len(matches) == 0 { - return fmt.Errorf("missing `match[]` query arg") - } - start, err := searchutils.GetTime(r, "start", 0) - if err != nil { - return err - } - end, err := searchutils.GetTime(r, "end", ct) + ep, err := getExportParams(r, startTime) if err != nil { return err } format := r.FormValue("format") maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line"))) reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage") - deadline := searchutils.GetDeadlineForExport(r, startTime) - if start >= end { - end = start + defaultStep - } - etfs, err := searchutils.GetExtraTagFilters(r) - if err != nil { - return err - } - if err := exportHandler(w, matches, etfs, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil { - return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err) + if err := exportHandler(w, ep, format, maxRowsPerLine, reduceMemUsage); err != nil { + return fmt.Errorf("error when exporting data on the time range (start=%d, end=%d): %w", ep.start, ep.end, err) } return nil } var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.TagFilter, start, end int64, format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error { +func exportHandler(w http.ResponseWriter, ep *exportParams, format string, maxRowsPerLine int, reduceMemUsage bool) error { writeResponseFunc := WriteExportStdResponse writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { bb := quicktemplate.AcquireByteBuffer() @@ -383,13 +348,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag } } - tagFilterss, err := getTagFilterssFromMatches(matches) - if err != nil { - return err - } - tagFilterss = searchutils.JoinTagFilterss(tagFilterss, etfs) - - sq := storage.NewSearchQuery(start, end, tagFilterss, *maxExportSeries) + sq := storage.NewSearchQuery(ep.start, ep.end, ep.filterss, *maxExportSeries) w.Header().Set("Content-Type", contentType) bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) @@ -397,7 +356,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error, 1) if !reduceMemUsage { - rss, err := netstorage.ProcessSearchQuery(sq, true, deadline) + rss, err := netstorage.ProcessSearchQuery(sq, true, ep.deadline) if err != nil { return fmt.Errorf("cannot fetch data for %q: %w", sq, err) } @@ -420,7 +379,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag }() } else { go func() { - err := netstorage.ExportBlocks(sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { if err := bw.Error(); err != nil { return err } @@ -447,7 +406,7 @@ func exportHandler(w http.ResponseWriter, matches []string, etfs [][]storage.Tag if err := bw.Flush(); err != nil { return err } - err = <-doneCh + err := <-doneCh if err != nil { return fmt.Errorf("error during sending the data to remote client: %w", err) } @@ -1049,7 +1008,20 @@ func QueryHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) e if end < start { end = start } - if err := exportHandler(w, []string{childQuery}, etfs, start, end, "promapi", 0, false, deadline); err != nil { + + tagFilterss, err := getTagFilterssFromMatches([]string{childQuery}) + if err != nil { + return err + } + filterss := searchutils.JoinTagFilterss(tagFilterss, etfs) + + ep := &exportParams{ + deadline: deadline, + start: start, + end: end, + filterss: filterss, + } + if err := exportHandler(w, ep, "promapi", 0, false); err != nil { return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err) } queryDuration.UpdateDuration(startTime) @@ -1379,3 +1351,56 @@ func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Reque } var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`) + +// exportParams contains common parameters for all /api/v1/export* handlers +// +// deadline, start, end, match[], extra_label, extra_filters +type exportParams struct { + deadline searchutils.Deadline + start int64 + end int64 + filterss [][]storage.TagFilter +} + +// getExportParams obtains common params from r, which are used for /api/v1/export* handlers +func getExportParams(r *http.Request, startTime time.Time) (*exportParams, error) { + deadline := searchutils.GetDeadlineForExport(r, startTime) + start, err := searchutils.GetTime(r, "start", 0) + if err != nil { + return nil, err + } + ct := startTime.UnixNano() / 1e6 + end, err := searchutils.GetTime(r, "end", ct) + if err != nil { + return nil, err + } + if end < start { + end = start + } + + matches := r.Form["match[]"] + if len(matches) == 0 { + // Maintain backwards compatibility + match := r.FormValue("match") + if len(match) == 0 { + return nil, fmt.Errorf("missing `match[]` arg") + } + matches = []string{match} + } + tagFilterss, err := getTagFilterssFromMatches(matches) + if err != nil { + return nil, err + } + etfs, err := searchutils.GetExtraTagFilters(r) + if err != nil { + return nil, err + } + filterss := searchutils.JoinTagFilterss(tagFilterss, etfs) + + return &exportParams{ + deadline: deadline, + start: start, + end: end, + filterss: filterss, + }, nil +}