mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
Code cleanup (#343)
* Small code cleanup: remove Request from params * Extract common params to all export handlers * Renamed ExportParams -> exportParams * wip Co-authored-by: Dzmitry Lazerka <dlazerka@gmail.com> Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
4fa3cd701c
commit
b5fce4190c
1 changed files with 86 additions and 61 deletions
|
@ -125,7 +125,6 @@ var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/fe
|
|||
func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||
defer exportCSVDuration.UpdateDuration(startTime)
|
||||
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||
}
|
||||
|
@ -134,21 +133,13 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
|
|||
return fmt.Errorf("missing `format` arg; see https://docs.victoriametrics.com/#how-to-export-csv-data")
|
||||
}
|
||||
fieldNames := strings.Split(format, ",")
|
||||
start, err := searchutils.GetTime(r, "start", 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
end, err := searchutils.GetTime(r, "end", ct)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
|
||||
deadline := searchutils.GetDeadlineForExport(r, startTime)
|
||||
tagFilterss, err := getTagFilterssFromRequest(r)
|
||||
ep, err := getExportParams(r, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries)
|
||||
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, ep.start, ep.end, ep.filterss, *maxExportSeries)
|
||||
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
|
@ -167,7 +158,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
|
|||
// Unconditionally deny partial response for the exported data,
|
||||
// since users usually expect that the exported data is full.
|
||||
denyPartialResponse := true
|
||||
rss, _, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, deadline)
|
||||
rss, _, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, ep.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
|
||||
}
|
||||
|
@ -190,7 +181,7 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
|
|||
}()
|
||||
} else {
|
||||
go func() {
|
||||
err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err := netstorage.ExportBlocks(at, sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -231,36 +222,27 @@ var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/a
|
|||
func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||
defer exportNativeDuration.UpdateDuration(startTime)
|
||||
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||
}
|
||||
start, err := searchutils.GetTime(r, "start", 0)
|
||||
ep, err := getExportParams(r, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
end, err := searchutils.GetTime(r, "end", ct)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deadline := searchutils.GetDeadlineForExport(r, startTime)
|
||||
tagFilterss, err := getTagFilterssFromRequest(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries)
|
||||
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, ep.start, ep.end, ep.filterss, *maxExportSeries)
|
||||
w.Header().Set("Content-Type", "VictoriaMetrics/native")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
|
||||
// Marshal tr
|
||||
trBuf := make([]byte, 0, 16)
|
||||
trBuf = encoding.MarshalInt64(trBuf, start)
|
||||
trBuf = encoding.MarshalInt64(trBuf, end)
|
||||
trBuf = encoding.MarshalInt64(trBuf, ep.start)
|
||||
trBuf = encoding.MarshalInt64(trBuf, ep.end)
|
||||
_, _ = bw.Write(trBuf)
|
||||
|
||||
// Marshal native blocks.
|
||||
err = netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err = netstorage.ExportBlocks(at, sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -305,43 +287,25 @@ var bbPool bytesutil.ByteBufferPool
|
|||
func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r *http.Request) error {
|
||||
defer exportDuration.UpdateDuration(startTime)
|
||||
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return fmt.Errorf("cannot parse request form values: %w", err)
|
||||
}
|
||||
matches := getMatchesFromRequest(r)
|
||||
if len(matches) == 0 {
|
||||
return fmt.Errorf("missing `match[]` query arg")
|
||||
}
|
||||
start, err := searchutils.GetTime(r, "start", 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
end, err := searchutils.GetTime(r, "end", ct)
|
||||
ep, err := getExportParams(r, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
format := r.FormValue("format")
|
||||
maxRowsPerLine := int(fastfloat.ParseInt64BestEffort(r.FormValue("max_rows_per_line")))
|
||||
reduceMemUsage := searchutils.GetBool(r, "reduce_mem_usage")
|
||||
deadline := searchutils.GetDeadlineForExport(r, startTime)
|
||||
if start >= end {
|
||||
end = start + defaultStep
|
||||
}
|
||||
etfs, err := searchutils.GetExtraTagFilters(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := exportHandler(at, w, r, matches, etfs, start, end, format, maxRowsPerLine, reduceMemUsage, deadline); err != nil {
|
||||
return fmt.Errorf("error when exporting data for queries=%q on the time range (start=%d, end=%d): %w", matches, start, end, err)
|
||||
if err := exportHandler(at, w, ep, format, maxRowsPerLine, reduceMemUsage); err != nil {
|
||||
return fmt.Errorf("error when exporting data on the time range (start=%d, end=%d): %w", ep.start, ep.end, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
|
||||
|
||||
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, etfs [][]storage.TagFilter, start, end int64,
|
||||
format string, maxRowsPerLine int, reduceMemUsage bool, deadline searchutils.Deadline) error {
|
||||
func exportHandler(at *auth.Token, w http.ResponseWriter, ep *exportParams, format string, maxRowsPerLine int, reduceMemUsage bool) error {
|
||||
writeResponseFunc := WriteExportStdResponse
|
||||
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) {
|
||||
bb := quicktemplate.AcquireByteBuffer()
|
||||
|
@ -394,12 +358,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
|
|||
}
|
||||
}
|
||||
|
||||
tagFilterss, err := getTagFilterssFromMatches(matches)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tagFilterss = searchutils.JoinTagFilterss(tagFilterss, etfs)
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, start, end, tagFilterss, *maxExportSeries)
|
||||
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, ep.start, ep.end, ep.filterss, *maxExportSeries)
|
||||
|
||||
w.Header().Set("Content-Type", contentType)
|
||||
bw := bufferedwriter.Get(w)
|
||||
|
@ -411,7 +370,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
|
|||
// Unconditionally deny partial response for the exported data,
|
||||
// since users usually expect that the exported data is full.
|
||||
denyPartialResponse := true
|
||||
rss, _, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, deadline)
|
||||
rss, _, err := netstorage.ProcessSearchQuery(at, denyPartialResponse, sq, true, ep.deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch data for %q: %w", sq, err)
|
||||
}
|
||||
|
@ -434,7 +393,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
|
|||
}()
|
||||
} else {
|
||||
go func() {
|
||||
err := netstorage.ExportBlocks(at, sq, deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
err := netstorage.ExportBlocks(at, sq, ep.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error {
|
||||
if err := bw.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -461,7 +420,7 @@ func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, match
|
|||
if err := bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
err = <-doneCh
|
||||
err := <-doneCh
|
||||
if err != nil {
|
||||
return fmt.Errorf("error during sending the data to remote client: %w", err)
|
||||
}
|
||||
|
@ -1127,7 +1086,20 @@ func QueryHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
|
|||
if end < start {
|
||||
end = start
|
||||
}
|
||||
if err := exportHandler(at, w, r, []string{childQuery}, etfs, start, end, "promapi", 0, false, deadline); err != nil {
|
||||
|
||||
tagFilterss, err := getTagFilterssFromMatches([]string{childQuery})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filterss := searchutils.JoinTagFilterss(tagFilterss, etfs)
|
||||
|
||||
ep := &exportParams{
|
||||
deadline: deadline,
|
||||
start: start,
|
||||
end: end,
|
||||
filterss: filterss,
|
||||
}
|
||||
if err := exportHandler(at, w, ep, "promapi", 0, false); err != nil {
|
||||
return fmt.Errorf("error when exporting data for query=%q on the time range (start=%d, end=%d): %w", childQuery, start, end, err)
|
||||
}
|
||||
queryDuration.UpdateDuration(startTime)
|
||||
|
@ -1467,3 +1439,56 @@ func QueryStatsHandler(startTime time.Time, at *auth.Token, w http.ResponseWrite
|
|||
}
|
||||
|
||||
var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`)
|
||||
|
||||
// exportParams contains common parameters for all /api/v1/export* handlers
|
||||
//
|
||||
// deadline, start, end, match[], extra_label, extra_filters
|
||||
type exportParams struct {
|
||||
deadline searchutils.Deadline
|
||||
start int64
|
||||
end int64
|
||||
filterss [][]storage.TagFilter
|
||||
}
|
||||
|
||||
// getExportParams obtains common params from r, which are used for /api/v1/export* handlers
|
||||
func getExportParams(r *http.Request, startTime time.Time) (*exportParams, error) {
|
||||
deadline := searchutils.GetDeadlineForExport(r, startTime)
|
||||
start, err := searchutils.GetTime(r, "start", 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
end, err := searchutils.GetTime(r, "end", ct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if end < start {
|
||||
end = start
|
||||
}
|
||||
|
||||
matches := r.Form["match[]"]
|
||||
if len(matches) == 0 {
|
||||
// Maintain backwards compatibility
|
||||
match := r.FormValue("match")
|
||||
if len(match) == 0 {
|
||||
return nil, fmt.Errorf("missing `match[]` arg")
|
||||
}
|
||||
matches = []string{match}
|
||||
}
|
||||
tagFilterss, err := getTagFilterssFromMatches(matches)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
etfs, err := searchutils.GetExtraTagFilters(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filterss := searchutils.JoinTagFilterss(tagFilterss, etfs)
|
||||
|
||||
return &exportParams{
|
||||
deadline: deadline,
|
||||
start: start,
|
||||
end: end,
|
||||
filterss: filterss,
|
||||
}, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue