lib/storage: reduce the frequency (and overhead) for timeout and pace limiter checks by 4x

This commit is contained in:
Aliaksandr Valialkin 2020-08-06 18:44:18 +03:00
parent 6c0a92a1ee
commit b690eeff53
2 changed files with 11 additions and 11 deletions

View file

@ -802,7 +802,7 @@ func (is *indexSearch) searchTagKeys(tks map[string]struct{}, maxTagKeys int) er
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tks) < maxTagKeys && ts.NextItem() { for len(tks) < maxTagKeys && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -882,7 +882,7 @@ func (is *indexSearch) searchTagValues(tvs map[string]struct{}, tagKey []byte, m
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for len(tvs) < maxTagValues && ts.NextItem() { for len(tvs) < maxTagValues && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -958,7 +958,7 @@ func (is *indexSearch) getSeriesCount() (uint64, error) {
kb.B = marshalTagValue(kb.B, nil) kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B) ts.Seek(kb.B)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return 0, err return 0, err
} }
@ -1030,7 +1030,7 @@ func (is *indexSearch) getTSDBStatusForDate(date uint64, topN int) (*TSDBStatus,
prefix := kb.B prefix := kb.B
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err return nil, err
} }
@ -1566,7 +1566,7 @@ func (is *indexSearch) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics
tsids := make([]TSID, len(metricIDs)) tsids := make([]TSID, len(metricIDs))
i := 0 i := 0
for loopsPaceLimiter, metricID := range metricIDs { for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&(1<<10) == 0 { if loopsPaceLimiter&(1<<12) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err return nil, err
} }
@ -1638,7 +1638,7 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
mn := GetMetricName() mn := GetMetricName()
defer PutMetricName(mn) defer PutMetricName(mn)
for loopsPaceLimiter, metricID := range sortedMetricIDs { for loopsPaceLimiter, metricID := range sortedMetricIDs {
if loopsPaceLimiter&(1<<10) == 0 { if loopsPaceLimiter&(1<<12) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -2126,7 +2126,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int,
prefix := tf.prefix prefix := tf.prefix
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<14) == 0 { if loopsPaceLimiter&(1<<16) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -2250,7 +2250,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
loopsPaceLimiter := 0 loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() { for metricIDs.Len() < maxMetrics && ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -2292,7 +2292,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
var sf []uint64 var sf []uint64
var metricID uint64 var metricID uint64
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<12) == 0 { if loopsPaceLimiter&(1<<14) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }
@ -2844,7 +2844,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
loopsPaceLimiter := 0 loopsPaceLimiter := 0
ts.Seek(prefix) ts.Seek(prefix)
for ts.NextItem() { for ts.NextItem() {
if loopsPaceLimiter&(1<<16) == 0 { if loopsPaceLimiter&(1<<18) == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil { if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return err return err
} }

View file

@ -204,7 +204,7 @@ func (s *Search) NextMetricBlock() bool {
return false return false
} }
for s.ts.NextBlock() { for s.ts.NextBlock() {
if s.loops&(1<<10) == 0 { if s.loops&(1<<12) == 0 {
if err := checkSearchDeadlineAndPace(s.deadline); err != nil { if err := checkSearchDeadlineAndPace(s.deadline); err != nil {
s.err = err s.err = err
return false return false