mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: properly check timeouts and pace limits
Previously they were checked on every iteration for small number of iterations
This commit is contained in:
parent
2704722b6d
commit
dd1d59f57a
3 changed files with 18 additions and 12 deletions
|
@ -802,7 +802,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
|
|||
prefix := kb.B
|
||||
ts.Seek(prefix)
|
||||
for len(tks) < maxTagKeys && ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -882,7 +882,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
|
|||
prefix := kb.B
|
||||
ts.Seek(prefix)
|
||||
for len(tvs) < maxTagValues && ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -958,7 +958,7 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
|
|||
kb.B = marshalTagValue(kb.B, nil)
|
||||
ts.Seek(kb.B)
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -1030,7 +1030,7 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
|
|||
prefix := kb.B
|
||||
ts.Seek(prefix)
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1566,7 +1566,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
|
|||
tsids := make([]TSID, len(metricIDs))
|
||||
i := 0
|
||||
for loopsPaceLimiter, metricID := range metricIDs {
|
||||
if loopsPaceLimiter&(1<<12) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1638,7 +1638,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
|
|||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
for loopsPaceLimiter, metricID := range sortedMetricIDs {
|
||||
if loopsPaceLimiter&(1<<12) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2126,7 +2126,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
|
|||
prefix := tf.prefix
|
||||
ts.Seek(prefix)
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<16) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2250,7 +2250,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
|
|||
loopsPaceLimiter := 0
|
||||
ts.Seek(prefix)
|
||||
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2292,7 +2292,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
|||
var sf []uint64
|
||||
var metricID uint64
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<14) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2844,7 +2844,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
|
|||
loopsPaceLimiter := 0
|
||||
ts.Seek(prefix)
|
||||
for ts.NextItem() {
|
||||
if loopsPaceLimiter&(1<<18) == 0 {
|
||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -207,7 +207,7 @@ func (s *Search) NextMetricBlock() bool {
|
|||
return false
|
||||
}
|
||||
for s.ts.NextBlock() {
|
||||
if s.loops&(1<<12) == 0 {
|
||||
if s.loops&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(s.deadline); err != nil {
|
||||
s.err = err
|
||||
return false
|
||||
|
@ -421,3 +421,9 @@ func checkSearchDeadlineAndPace(deadline uint64) error {
|
|||
storagepacelimiter.Search.WaitIfNeeded()
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
paceLimiterFastIterationsMask = 1<<16 - 1
|
||||
paceLimiterMediumIterationsMask = 1<<14 - 1
|
||||
paceLimiterSlowIterationsMask = 1<<12 - 1
|
||||
)
|
||||
|
|
|
@ -934,7 +934,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
|
|||
is := idb.getIndexSearch(accountID, projectID, deadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
for loops, metricID := range metricIDs {
|
||||
if loops&(1<<10) == 0 {
|
||||
if loops&paceLimiterSlowIterationsMask == 0 {
|
||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue