app/vmselect: move Deadline from netstorage to searchutils

This removes dependency on netstorage from searchutils.
This commit is contained in:
Aliaksandr Valialkin 2020-09-11 13:18:57 +03:00
parent 58d3b82ae5
commit d3ad0d365e
6 changed files with 72 additions and 63 deletions

View file

@ -207,7 +207,7 @@ func MetricsIndexHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
}
// metricsFind searches for label values that match the given query.
func metricsFind(at *auth.Token, tr storage.TimeRange, label, query string, delimiter byte, deadline netstorage.Deadline) ([]string, bool, error) {
func metricsFind(at *auth.Token, tr storage.TimeRange, label, query string, delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) {
expandTail := strings.HasSuffix(query, "*")
for strings.HasSuffix(query, "*") {
query = query[:len(query)-1]

View file

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
@ -53,7 +54,7 @@ type Results struct {
at *auth.Token
tr storage.TimeRange
fetchData bool
deadline Deadline
deadline searchutils.Deadline
tbf *tmpBlocksFile
@ -457,7 +458,7 @@ func (sbh *sortBlocksHeap) Pop() interface{} {
}
// DeleteSeries deletes time series matching the given sq.
func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (int, error) {
func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline searchutils.Deadline) (int, error) {
requestData := sq.Marshal(nil)
// Send the query to all the storage nodes in parallel.
@ -501,7 +502,7 @@ func DeleteSeries(at *auth.Token, sq *storage.SearchQuery, deadline Deadline) (i
}
// GetLabels returns labels until the given deadline.
func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
func GetLabels(at *auth.Token, deadline searchutils.Deadline) ([]string, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -573,7 +574,7 @@ func GetLabels(at *auth.Token, deadline Deadline) ([]string, bool, error) {
// GetLabelValues returns label values for the given labelName
// until the given deadline.
func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]string, bool, error) {
func GetLabelValues(at *auth.Token, labelName string, deadline searchutils.Deadline) ([]string, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -643,7 +644,7 @@ func GetLabelValues(at *auth.Token, labelName string, deadline Deadline) ([]stri
// GetTagValueSuffixes returns tag value suffixes for the given tagKey and the given tagValuePrefix.
//
// It can be used for implementing https://graphite-api.readthedocs.io/en/latest/api.html#metrics-find
func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, bool, error) {
func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline searchutils.Deadline) ([]string, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -708,7 +709,7 @@ func GetTagValueSuffixes(at *auth.Token, tr storage.TimeRange, tagKey, tagValueP
}
// GetLabelEntries returns all the label entries for at until the given deadline.
func GetLabelEntries(at *auth.Token, deadline Deadline) ([]storage.TagEntry, bool, error) {
func GetLabelEntries(at *auth.Token, deadline searchutils.Deadline) ([]storage.TagEntry, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -816,7 +817,7 @@ func deduplicateStrings(a []string) []string {
}
// GetTSDBStatusForDate returns tsdb status according to https://prometheus.io/docs/prometheus/latest/querying/api/#tsdb-stats
func GetTSDBStatusForDate(at *auth.Token, deadline Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
func GetTSDBStatusForDate(at *auth.Token, deadline searchutils.Deadline, date uint64, topN int) (*storage.TSDBStatus, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -920,7 +921,7 @@ func toTopHeapEntries(m map[string]uint64, topN int) []storage.TopHeapEntry {
}
// GetSeriesCount returns the number of unique series for the given at.
func GetSeriesCount(at *auth.Token, deadline Deadline) (uint64, bool, error) {
func GetSeriesCount(at *auth.Token, deadline searchutils.Deadline) (uint64, bool, error) {
if deadline.Exceeded() {
return 0, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -1008,7 +1009,7 @@ func (tbfw *tmpBlocksFileWrapper) WriteBlock(mb *storage.MetricBlock) error {
}
// ProcessSearchQuery performs sq on storage nodes until the given deadline.
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline Deadline) (*Results, bool, error) {
func ProcessSearchQuery(at *auth.Token, sq *storage.SearchQuery, fetchData bool, deadline searchutils.Deadline) (*Results, bool, error) {
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
@ -1146,7 +1147,7 @@ type storageNode struct {
metricRowsRead *metrics.Counter
}
func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int, error) {
func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.Deadline) (int, error) {
var deletedCount int
f := func(bc *handshake.BufferedConn) error {
n, err := sn.deleteMetricsOnConn(bc, requestData)
@ -1166,7 +1167,7 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline Deadline) (int
return deletedCount, nil
}
func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline) ([]string, error) {
func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchutils.Deadline) ([]string, error) {
var labels []string
f := func(bc *handshake.BufferedConn) error {
ls, err := sn.getLabelsOnConn(bc, accountID, projectID)
@ -1186,7 +1187,7 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline Deadline)
return labels, nil
}
func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline Deadline) ([]string, error) {
func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName string, deadline searchutils.Deadline) ([]string, error) {
var labelValues []string
f := func(bc *handshake.BufferedConn) error {
lvs, err := sn.getLabelValuesOnConn(bc, accountID, projectID, labelName)
@ -1206,7 +1207,8 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str
return labelValues, nil
}
func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string, delimiter byte, deadline Deadline) ([]string, error) {
func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr storage.TimeRange, tagKey, tagValuePrefix string,
delimiter byte, deadline searchutils.Deadline) ([]string, error) {
var suffixes []string
f := func(bc *handshake.BufferedConn) error {
ss, err := sn.getTagValueSuffixesOnConn(bc, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter)
@ -1226,7 +1228,7 @@ func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr stora
return suffixes, nil
}
func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Deadline) ([]storage.TagEntry, error) {
func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline searchutils.Deadline) ([]storage.TagEntry, error) {
var tagEntries []storage.TagEntry
f := func(bc *handshake.BufferedConn) error {
tes, err := sn.getLabelEntriesOnConn(bc, accountID, projectID)
@ -1246,7 +1248,7 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline Dea
return tagEntries, nil
}
func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline Deadline) (*storage.TSDBStatus, error) {
func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date uint64, topN int, deadline searchutils.Deadline) (*storage.TSDBStatus, error) {
var status *storage.TSDBStatus
f := func(bc *handshake.BufferedConn) error {
st, err := sn.getTSDBStatusForDateOnConn(bc, accountID, projectID, date, topN)
@ -1266,7 +1268,7 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui
return status, nil
}
func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Deadline) (uint64, error) {
func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline searchutils.Deadline) (uint64, error) {
var n uint64
f := func(bc *handshake.BufferedConn) error {
nn, err := sn.getSeriesCountOnConn(bc, accountID, projectID)
@ -1286,7 +1288,7 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline Dead
return n, nil
}
func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline Deadline) error {
func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestData []byte, tr storage.TimeRange, fetchData bool, deadline searchutils.Deadline) error {
var blocksRead int
f := func(bc *handshake.BufferedConn) error {
n, err := sn.processSearchQueryOnConn(tbfw, bc, requestData, tr, fetchData)
@ -1305,7 +1307,7 @@ func (sn *storageNode) processSearchQuery(tbfw *tmpBlocksFileWrapper, requestDat
return nil
}
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline Deadline) error {
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
select {
case sn.concurrentQueriesCh <- struct{}{}:
default:
@ -1319,7 +1321,7 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
if err != nil {
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
}
d := time.Unix(int64(deadline.deadline), 0)
d := time.Unix(int64(deadline.Deadline()), 0)
if err := bc.SetDeadline(d); err != nil {
_ = bc.Close()
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
@ -1334,8 +1336,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
now := fasttime.UnixTimestamp()
timeout := uint64(0)
if deadline.deadline > now {
timeout = deadline.deadline - now
if deadline.Deadline() > now {
timeout = deadline.Deadline() - now
}
if timeout > (1<<32)-2 {
timeout = (1 << 32) - 2
@ -1877,33 +1879,3 @@ var (
// The maximum number of concurrent queries per storageNode.
const maxConcurrentQueriesPerStorageNode = 100
// Deadline contains deadline with the corresponding timeout for pretty error messages.
type Deadline struct {
deadline uint64
timeout time.Duration
flagHint string
}
// NewDeadline returns deadline for the given timeout.
//
// flagHint must contain a hit for command-line flag, which could be used
// in order to increase timeout.
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
return Deadline{
deadline: uint64(startTime.Add(timeout).Unix()),
timeout: timeout,
flagHint: flagHint,
}
}
// Exceeded returns true if deadline is exceeded.
func (d *Deadline) Exceeded() bool {
return fasttime.UnixTimestamp() > d.deadline
}
// String returns human-readable string representation for d.
func (d *Deadline) String() string {
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
}

View file

@ -158,7 +158,7 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline netstorage.Deadline) error {
func exportHandler(at *auth.Token, w http.ResponseWriter, r *http.Request, matches []string, start, end int64, format string, maxRowsPerLine int, deadline searchutils.Deadline) error {
writeResponseFunc := WriteExportStdResponse
writeLineFunc := func(rs *netstorage.Result, resultsCh chan<- *quicktemplate.ByteBuffer) {
bb := quicktemplate.AcquireByteBuffer()
@ -378,7 +378,7 @@ func LabelValuesHandler(startTime time.Time, at *auth.Token, labelName string, w
return nil
}
func labelValuesWithMatches(at *auth.Token, labelName string, matches []string, start, end int64, deadline netstorage.Deadline) ([]string, bool, error) {
func labelValuesWithMatches(at *auth.Token, labelName string, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
if len(matches) == 0 {
logger.Panicf("BUG: matches must be non-empty")
}
@ -553,7 +553,7 @@ func LabelsHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
return nil
}
func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadline netstorage.Deadline) ([]string, bool, error) {
func labelsWithMatches(at *auth.Token, matches []string, start, end int64, deadline searchutils.Deadline) ([]string, bool, error) {
if len(matches) == 0 {
logger.Panicf("BUG: matches must be non-empty")
}

View file

@ -8,6 +8,7 @@ import (
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -90,7 +91,7 @@ type EvalConfig struct {
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
Deadline netstorage.Deadline
Deadline searchutils.Deadline
MayCache bool

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
@ -31,7 +32,7 @@ func TestExecSuccess(t *testing.T) {
Start: start,
End: end,
Step: step,
Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""),
Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""),
}
for i := 0; i < 5; i++ {
result, err := Exec(ec, q, false)
@ -5910,7 +5911,7 @@ func TestExecError(t *testing.T) {
Start: 1000,
End: 2000,
Step: 100,
Deadline: netstorage.NewDeadline(time.Now(), time.Minute, ""),
Deadline: searchutils.NewDeadline(time.Now(), time.Minute, ""),
}
for i := 0; i < 4; i++ {
rv, err := Exec(ec, q, false)

View file

@ -9,7 +9,7 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metricsql"
)
@ -98,18 +98,18 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
// GetDeadlineForQuery returns deadline for the given query r.
func GetDeadlineForQuery(r *http.Request, startTime time.Time) netstorage.Deadline {
func GetDeadlineForQuery(r *http.Request, startTime time.Time) Deadline {
dMax := maxQueryDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxQueryDuration")
}
// GetDeadlineForExport returns deadline for the given request to /api/v1/export.
func GetDeadlineForExport(r *http.Request, startTime time.Time) netstorage.Deadline {
func GetDeadlineForExport(r *http.Request, startTime time.Time) Deadline {
dMax := maxExportDuration.Milliseconds()
return getDeadlineWithMaxDuration(r, startTime, dMax, "-search.maxExportDuration")
}
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) netstorage.Deadline {
func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64, flagHint string) Deadline {
d, err := GetDuration(r, "timeout", 0)
if err != nil {
d = 0
@ -118,7 +118,7 @@ func getDeadlineWithMaxDuration(r *http.Request, startTime time.Time, dMax int64
d = dMax
}
timeout := time.Duration(d) * time.Millisecond
return netstorage.NewDeadline(startTime, timeout, flagHint)
return NewDeadline(startTime, timeout, flagHint)
}
// GetBool returns boolean value from the given argKey query arg.
@ -139,3 +139,38 @@ func GetDenyPartialResponse(r *http.Request) bool {
}
return GetBool(r, "deny_partial_response")
}
// Deadline contains deadline with the corresponding timeout for pretty error messages.
type Deadline struct {
deadline uint64
timeout time.Duration
flagHint string
}
// NewDeadline returns deadline for the given timeout.
//
// flagHint must contain a hit for command-line flag, which could be used
// in order to increase timeout.
func NewDeadline(startTime time.Time, timeout time.Duration, flagHint string) Deadline {
return Deadline{
deadline: uint64(startTime.Add(timeout).Unix()),
timeout: timeout,
flagHint: flagHint,
}
}
// Exceeded returns true if deadline is exceeded.
func (d *Deadline) Exceeded() bool {
return fasttime.UnixTimestamp() > d.deadline
}
// Deadline returns deadline in unix timestamp seconds.
func (d *Deadline) Deadline() uint64 {
return d.deadline
}
// String returns human-readable string representation for d.
func (d *Deadline) String() string {
return fmt.Sprintf("%.3f seconds; the timeout can be adjusted with `%s` command-line flag", d.timeout.Seconds(), d.flagHint)
}