lib/storage: properly check timeouts and pace limits

Previously they were checked on every iteration for small number of iterations
This commit is contained in:
Aliaksandr Valialkin 2020-08-07 08:37:33 +03:00
parent 1ad3de5c54
commit 9043a509a3
3 changed files with 18 additions and 12 deletions

View file

@ -780,7 +780,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
prefix := kb.B
ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -860,7 +860,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
prefix := kb.B
ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -936,7 +936,7 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B)
for ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return 0, err
}
@ -1008,7 +1008,7 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
@ -1532,7 +1532,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
tsids := make([]TSID, len(metricIDs))
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&(1<<12) == 0 {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
@ -1604,7 +1604,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
mn := GetMetricName()
defer PutMetricName(mn)
for loopsPaceLimiter, metricID := range sortedMetricIDs {
if loopsPaceLimiter&(1<<12) == 0 {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -2092,7 +2092,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
prefix := tf.prefix
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 {
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -2216,7 +2216,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
loopsPaceLimiter := 0
ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -2258,7 +2258,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
var sf []uint64
var metricID uint64
for ts.NextItem() {
if loopsPaceLimiter&(1<<14) == 0 {
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}
@ -2799,7 +2799,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
for ts.NextItem() {
if loopsPaceLimiter&(1<<18) == 0 {
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}

View file

@ -142,7 +142,7 @@ func (s *Search) NextMetricBlock() bool {
return false
}
for s.ts.NextBlock() {
if s.loops&(1<<12) == 0 {
if s.loops&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(s.deadline); err != nil {
s.err = err
return false
@ -340,3 +340,9 @@ func checkSearchDeadlineAndPace(deadline uint64) error {
storagepacelimiter.Search.WaitIfNeeded()
return nil
}
const (
paceLimiterFastIterationsMask = 1<<16 - 1
paceLimiterMediumIterationsMask = 1<<14 - 1
paceLimiterSlowIterationsMask = 1<<12 - 1
)

View file

@ -864,7 +864,7 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error {
is := idb.getIndexSearch(deadline)
defer idb.putIndexSearch(is)
for loops, metricID := range metricIDs {
if loops&(1<<10) == 0 {
if loops&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err
}