mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmselect: remove -search.storageTimeout
command-line flag, since it has the same meaning as -search.maxQueryDuration
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
parent
7bafaad46d
commit
ecfd6fe78d
3 changed files with 40 additions and 95 deletions
|
@ -6,6 +6,7 @@ import (
|
|||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
@ -1493,13 +1494,7 @@ func (sn *storageNode) registerMetricNames(mrs []storage.MetricRow, deadline sea
|
|||
f := func(bc *handshake.BufferedConn) error {
|
||||
return sn.registerMetricNamesOnConn(bc, mrs)
|
||||
}
|
||||
if err := sn.execOnConn("registerMetricNames_v1", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
if err = sn.execOnConn("registerMetricNames_v1", f, deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return sn.execOnConnWithPossibleRetry("registerMetricNames_v1", f, deadline)
|
||||
}
|
||||
|
||||
func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.Deadline) (int, error) {
|
||||
|
@ -1509,15 +1504,11 @@ func (sn *storageNode) deleteMetrics(requestData []byte, deadline searchutils.De
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
deletedCount += n
|
||||
deletedCount = n
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
// There is no need in zeroing deletedCount.
|
||||
if err = sn.execOnConn("deleteMetrics_v3", f, deadline); err != nil {
|
||||
return deletedCount, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("deleteMetrics_v3", f, deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return deletedCount, nil
|
||||
}
|
||||
|
@ -1532,12 +1523,8 @@ func (sn *storageNode) getLabelsOnTimeRange(accountID, projectID uint32, tr stor
|
|||
labels = ls
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
labels = nil
|
||||
if err = sn.execOnConn("labelsOnTimeRange_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("labelsOnTimeRange_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return labels, nil
|
||||
}
|
||||
|
@ -1552,12 +1539,8 @@ func (sn *storageNode) getLabels(accountID, projectID uint32, deadline searchuti
|
|||
labels = ls
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("labels_v2", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
labels = nil
|
||||
if err = sn.execOnConn("labels_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("labels_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return labels, nil
|
||||
}
|
||||
|
@ -1572,12 +1555,8 @@ func (sn *storageNode) getLabelValuesOnTimeRange(accountID, projectID uint32, la
|
|||
labelValues = lvs
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
labelValues = nil
|
||||
if err = sn.execOnConn("labelValuesOnTimeRange_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("labelValuesOnTimeRange_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return labelValues, nil
|
||||
}
|
||||
|
@ -1592,12 +1571,8 @@ func (sn *storageNode) getLabelValues(accountID, projectID uint32, labelName str
|
|||
labelValues = lvs
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
labelValues = nil
|
||||
if err = sn.execOnConn("labelValues_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("labelValues_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return labelValues, nil
|
||||
}
|
||||
|
@ -1613,12 +1588,8 @@ func (sn *storageNode) getTagValueSuffixes(accountID, projectID uint32, tr stora
|
|||
suffixes = ss
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("tagValueSuffixes_v1", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
suffixes = nil
|
||||
if err = sn.execOnConn("tagValueSuffixes_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("tagValueSuffixes_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return suffixes, nil
|
||||
}
|
||||
|
@ -1633,12 +1604,8 @@ func (sn *storageNode) getLabelEntries(accountID, projectID uint32, deadline sea
|
|||
tagEntries = tes
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
tagEntries = nil
|
||||
if err = sn.execOnConn("labelEntries_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("labelEntries_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tagEntries, nil
|
||||
}
|
||||
|
@ -1653,12 +1620,8 @@ func (sn *storageNode) getTSDBStatusForDate(accountID, projectID uint32, date ui
|
|||
status = st
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
status = nil
|
||||
if err = sn.execOnConn("tsdbStatus_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("tsdbStatus_v2", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return status, nil
|
||||
}
|
||||
|
@ -1673,12 +1636,8 @@ func (sn *storageNode) getSeriesCount(accountID, projectID uint32, deadline sear
|
|||
n = nn
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
n = 0
|
||||
if err = sn.execOnConn("seriesCount_v2", f, deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("seriesCount_v2", f, deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
@ -1693,11 +1652,8 @@ func (sn *storageNode) processSearchMetricNames(requestData []byte, deadline sea
|
|||
metricNames = mns
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil {
|
||||
// Try again before giving up.
|
||||
if err = sn.execOnConn("searchMetricNames_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.execOnConnWithPossibleRetry("searchMetricNames_v1", f, deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metricNames, nil
|
||||
}
|
||||
|
@ -1709,13 +1665,22 @@ func (sn *storageNode) processSearchQuery(requestData []byte, fetchData bool, pr
|
|||
}
|
||||
return nil
|
||||
}
|
||||
if err := sn.execOnConn("search_v4", f, deadline); err != nil {
|
||||
// Try again before giving up if zero blocks read on the previous attempt.
|
||||
if err = sn.execOnConn("search_v4", f, deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
return sn.execOnConnWithPossibleRetry("search_v4", f, deadline)
|
||||
}
|
||||
|
||||
func (sn *storageNode) execOnConnWithPossibleRetry(funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
|
||||
err := sn.execOnConn(funcName, f, deadline)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
var er *errRemote
|
||||
var ne net.Error
|
||||
if errors.As(err, &er) || errors.As(err, &ne) && ne.Timeout() {
|
||||
// There is no sense in repeating the query on errors induced by vmstorage (errRemote) or on network timeout errors.
|
||||
return err
|
||||
}
|
||||
// Repeat the query in the hope the error was temporary.
|
||||
return sn.execOnConn(funcName, f, deadline)
|
||||
}
|
||||
|
||||
func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {
|
||||
|
@ -1731,21 +1696,9 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
|||
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||
nowSecs := fasttime.UnixTimestamp()
|
||||
currentTime := time.Unix(int64(nowSecs), 0)
|
||||
storageTimeout := *searchutils.StorageTimeout
|
||||
if storageTimeout > 0 {
|
||||
dd := currentTime.Add(storageTimeout)
|
||||
if dd.Sub(d) < 0 {
|
||||
// Limit the remote deadline to storageTimeout,
|
||||
// so slow vmstorage nodes may stop processing the request.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711 .
|
||||
// The local deadline remains the same, so data obtained from
|
||||
// the remaining vmstorage nodes could be processed locally.
|
||||
d = dd
|
||||
}
|
||||
}
|
||||
timeout := d.Sub(currentTime)
|
||||
if timeout <= 0 {
|
||||
return fmt.Errorf("request timeout reached: %s or -search.storageTimeout=%s", deadline.String(), storageTimeout.String())
|
||||
return fmt.Errorf("request timeout reached: %s", deadline.String())
|
||||
}
|
||||
bc, err := sn.connPool.Get()
|
||||
if err != nil {
|
||||
|
|
|
@ -20,12 +20,6 @@ var (
|
|||
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution; see also -search.storageTimeout")
|
||||
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses if a part of -storageNode instances fail to perform queries; "+
|
||||
"this trades availability over consistency; see also -search.maxQueryDuration and -search.storageTimeout")
|
||||
|
||||
// StorageTimeout limits the duration of query execution on every vmstorage node.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
|
||||
StorageTimeout = flag.Duration("search.storageTimeout", 0, "The timeout for per-storage query processing; "+
|
||||
"this allows returning partial responses if certain -storageNode instances slowly process the query; "+
|
||||
"see also -search.maxQueryDuration and -search.denyPartialResponse command-line flags")
|
||||
)
|
||||
|
||||
func roundToSeconds(ms int64) int64 {
|
||||
|
@ -124,9 +118,6 @@ func GetMaxQueryDuration(r *http.Request) time.Duration {
|
|||
if d <= 0 || d > *maxQueryDuration {
|
||||
d = *maxQueryDuration
|
||||
}
|
||||
if *StorageTimeout > 0 && d > *StorageTimeout {
|
||||
d = *StorageTimeout
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
* FEATURE: publish vmutils for `GOOS=arm` on [releases page](https://github.com/VictoriaMetrics/VictoriaMetrics/releases).
|
||||
|
||||
* BUGFIX: prevent from possible incomplete query results after timed out query.
|
||||
* BUGFIX: vmselect: remove `-search.storageTimeout` command-line flag, since it has the same meaning as `-search.maxQueryDuration`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711#issuecomment-808884995).
|
||||
|
||||
|
||||
# [v1.57.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.57.0)
|
||||
|
|
Loading…
Reference in a new issue