From d3ad0d365ed4e7057b1e067c012cd96a12249e95 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 11 Sep 2020 13:18:57 +0300 Subject: [PATCH] app/vmselect: move Deadline from netstorage to searchutils This removes dependency on netstorage from searchutils. --- app/vmselect/graphite/graphite.go | 2 +- app/vmselect/netstorage/netstorage.go | 74 ++++++++----------------- app/vmselect/prometheus/prometheus.go | 6 +- app/vmselect/promql/eval.go | 3 +- app/vmselect/promql/exec_test.go | 5 +- app/vmselect/searchutils/searchutils.go | 45 +++++++++++++-- 6 files changed, 72 insertions(+), 63 deletions(-) diff --git a/app/vmselect/graphite/graphite.go b/app/vmselect/graphite/graphite.go index 6be4d4cfbd..37b48e0234 100644 --- a/app/vmselect/graphite/graphite.go +++ b/app/vmselect/graphite/graphite.go @@ -207,7 +207,7 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri } // metricsFind searches for label values that match the given query. -func metricsFind(at *auth.Token, tr storage.TimeRange, label, query string, delimiter byte, deadline netstorage.Deadline) ([]string, bool, error) { +func metricsFind(at *auth.Token, tr storage.TimeRange, label, query string, delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) { expandTail := strings.HasSuffix(query, "*") for strings.HasSuffix(query, "*") { query = query[:len(query)-1] diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 4fa7e439f8..4b1128cd9a 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" @@ -53,7 +54,7 @@ type Results struct { at *auth.Token tr storage.TimeRange fetchData bool - deadline Deadline + deadline searchutils.Deadline tbf *tmpBlocksFile @@ -457,7 +458,7 @@ func (sbh *sortBlocksHeap) Pop() interface{} { } // DeleteSeries deletes time series matching the given sq. -func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (int, error) { +func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) { requestData := sq.Marshal(nil) // Send the query to all the storage nodes in parallel. @@ -501,7 +502,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (i } // GetLabels returns labels until the given deadline. -func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) { +func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -573,7 +574,7 @@ func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) { // GetLabelValues returns label values for the given labelName // until the given deadline. -func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) { +func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadline) ([]string, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -643,7 +644,7 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri // GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix. // // It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find -func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, bool, error) { +func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -708,7 +709,7 @@ func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValueP } // GetLabelEntries returns all the label entries for at until the given deadline. -func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) { +func GetLabelEntries(at *auth.Token, deadline searchutils.Deadline) ([]storage.TagEntry, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -816,7 +817,7 @@ func deduplicateStrings(a []string) []string { } // GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats -func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) { +func GetTSDBStatusForDate(at *auth.Token, deadline searchutils.Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -920,7 +921,7 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry { } // GetSeriesCount returns the number of unique series for the given at. -func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) { +func GetSeriesCount(at *auth.Token, deadline searchutils.Deadline) (uint64, bool, error) { if deadline.Exceeded() { return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -1008,7 +1009,7 @@ func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error { } // ProcessSearchQuery performs sq on storage nodes until the given deadline. -func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) { +func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, bool, error) { if deadline.Exceeded() { return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String()) } @@ -1146,7 +1147,7 @@ type storageNode struct { metricRowsRead *metrics.Counter } -func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int, error) { +func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.Deadline) (int, error) { var deletedCount int f := func(bc *handshake.BufferedConn) error { n, err := sn.deleteMetricsOnConn(bc, requestData) @@ -1166,7 +1167,7 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int return deletedCount, nil } -func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline) ([]string, error) { +func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchutils.Deadline) ([]string, error) { var labels []string f := func(bc *handshake.BufferedConn) error { ls, err := sn.getLabelsOnConn(bc, accountID, projectID) @@ -1186,7 +1187,7 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline) return labels, nil } -func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline Deadline) ([]string, error) { +func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline searchutils.Deadline) ([]string, error) { var labelValues []string f := func(bc *handshake.BufferedConn) error { lvs, err := sn.getLabelValuesOnConn(bc, accountID, projectID, labelName) @@ -1206,7 +1207,8 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str return labelValues, nil } -func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, error) { +func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, + delimiter byte, deadline searchutils.Deadline) ([]string, error) { var suffixes []string f := func(bc *handshake.BufferedConn) error { ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter) @@ -1226,7 +1228,7 @@ func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr stora return suffixes, nil } -func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Deadline) ([]storage.TagEntry, error) { +func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline searchutils.Deadline) ([]storage.TagEntry, error) { var tagEntries []storage.TagEntry f := func(bc *handshake.BufferedConn) error { tes, err := sn.getLabelEntriesOnConn(bc, accountID, projectID) @@ -1246,7 +1248,7 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Dea return tagEntries, nil } -func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline Deadline) (*storage.TSDBStatus, error) { +func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) { var status *storage.TSDBStatus f := func(bc *handshake.BufferedConn) error { st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN) @@ -1266,7 +1268,7 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui return status, nil } -func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Deadline) (uint64, error) { +func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline searchutils.Deadline) (uint64, error) { var n uint64 f := func(bc *handshake.BufferedConn) error { nn, err := sn.getSeriesCountOnConn(bc, accountID, projectID) @@ -1286,7 +1288,7 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead return n, nil } -func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) error { +func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline searchutils.Deadline) error { var blocksRead int f := func(bc *handshake.BufferedConn) error { n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData) @@ -1305,7 +1307,7 @@ func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestDat return nil } -func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline Deadline) error { +func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { select { case sn.concurrentQueriesCh <- struct{}{}: default: @@ -1319,7 +1321,7 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC if err != nil { return fmt.Errorf("cannot obtain connection from a pool: %w", err) } - d := time.Unix(int64(deadline.deadline), 0) + d := time.Unix(int64(deadline.Deadline()), 0) if err := bc.SetDeadline(d); err != nil { _ = bc.Close() logger.Panicf("FATAL: cannot set connection deadline: %s", err) @@ -1334,8 +1336,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC // Send the remaining timeout instead of deadline to remote server, since it may have different time. now := fasttime.UnixTimestamp() timeout := uint64(0) - if deadline.deadline > now { - timeout = deadline.deadline - now + if deadline.Deadline() > now { + timeout = deadline.Deadline() - now } if timeout > (1<<32)-2 { timeout = (1 << 32) - 2 @@ -1877,33 +1879,3 @@ var ( // The maximum number of concurrent queries per storageNode. const maxConcurrentQueriesPerStorageNode = 100 - -// Deadline contains deadline with the corresponding timeout for pretty error messages. -type Deadline struct { - deadline uint64 - - timeout time.Duration - flagHint string -} - -// NewDeadline returns deadline for the given timeout. -// -// flagHint must contain a hit for command-line flag, which could be used -// in order to increase timeout. -func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { - return Deadline{ - deadline: uint64(startTime.Add(timeout).Unix()), - timeout: timeout, - flagHint: flagHint, - } -} - -// Exceeded returns true if deadline is exceeded. -func (d *Deadline) Exceeded() bool { - return fasttime.UnixTimestamp() > d.deadline -} - -// String returns human-readable string representation for d. -func (d *Deadline) String() string { - return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) -} diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index b7c4db979f..d28c4b2b17 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -158,7 +158,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) -func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error { +func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error { writeResponseFunc := WriteExportStdResponse writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) { bb := quicktemplate.AcquireByteBuffer() @@ -378,7 +378,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w return nil } -func labelValuesWithMatches(at *auth.Token, labelName string, matches []string, start, end int64, deadline netstorage.Deadline) ([]string, bool, error) { +func labelValuesWithMatches(at *auth.Token, labelName string, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { if len(matches) == 0 { logger.Panicf("BUG: matches must be non-empty") } @@ -553,7 +553,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r return nil } -func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadline netstorage.Deadline) ([]string, bool, error) { +func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) { if len(matches) == 0 { logger.Panicf("BUG: matches must be non-empty") } diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index f6f246256e..af2b58dddc 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -90,7 +91,7 @@ type EvalConfig struct { // QuotedRemoteAddr contains quoted remote address. QuotedRemoteAddr string - Deadline netstorage.Deadline + Deadline searchutils.Deadline MayCache bool diff --git a/app/vmselect/promql/exec_test.go b/app/vmselect/promql/exec_test.go index af5d976d8d..a5951fc5b3 100644 --- a/app/vmselect/promql/exec_test.go +++ b/app/vmselect/promql/exec_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) @@ -31,7 +32,7 @@ func TestExecSuccess(t *testing.T) { Start: start, End: end, Step: step, - Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), + Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 5; i++ { result, err := Exec(ec, q, false) @@ -5910,7 +5911,7 @@ func TestExecError(t *testing.T) { Start: 1000, End: 2000, Step: 100, - Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""), + Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""), } for i := 0; i < 4; i++ { rv, err := Exec(ec, q, false) diff --git a/app/vmselect/searchutils/searchutils.go b/app/vmselect/searchutils/searchutils.go index a5b3201a86..0a857e9716 100644 --- a/app/vmselect/searchutils/searchutils.go +++ b/app/vmselect/searchutils/searchutils.go @@ -9,7 +9,7 @@ import ( "strings" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/metricsql" ) @@ -98,18 +98,18 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000 // GetDeadlineForQuery returns deadline for the given query r. -func GetDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline { +func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline { dMax := maxQueryDuration.Milliseconds() return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration") } // GetDeadlineForExport returns deadline for the given request to /api/v1/export. -func GetDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline { +func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline { dMax := maxExportDuration.Milliseconds() return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration") } -func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline { +func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline { d, err := GetDuration(r, "timeout", 0) if err != nil { d = 0 @@ -118,7 +118,7 @@ func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64 d = dMax } timeout := time.Duration(d) * time.Millisecond - return netstorage.NewDeadline(startTime, timeout, flagHint) + return NewDeadline(startTime, timeout, flagHint) } // GetBool returns boolean value from the given argKey query arg. @@ -139,3 +139,38 @@ func GetDenyPartialResponse(r *http.Request) bool { } return GetBool(r, "deny_partial_response") } + +// Deadline contains deadline with the corresponding timeout for pretty error messages. +type Deadline struct { + deadline uint64 + + timeout time.Duration + flagHint string +} + +// NewDeadline returns deadline for the given timeout. +// +// flagHint must contain a hit for command-line flag, which could be used +// in order to increase timeout. +func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline { + return Deadline{ + deadline: uint64(startTime.Add(timeout).Unix()), + timeout: timeout, + flagHint: flagHint, + } +} + +// Exceeded returns true if deadline is exceeded. +func (d *Deadline) Exceeded() bool { + return fasttime.UnixTimestamp() > d.deadline +} + +// Deadline returns deadline in unix timestamp seconds. +func (d *Deadline) Deadline() uint64 { + return d.deadline +} + +// String returns human-readable string representation for d. +func (d *Deadline) String() string { + return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint) +}