lib/storage: return back filter arg to getMetricIDsForTagFilter function

The filter arg has been removed in the commit c7ee2fabb8
because it was preventing from caching the number of matching time series per each tf.

Now the cache contains duration for tf execution, so the filter shouldn't break such caching.
This commit is contained in:
Aliaksandr Valialkin 2021-02-17 19:29:48 +02:00
parent 83da939947
commit 2005d8212a

View file

@ -2072,7 +2072,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
continue continue
} }
metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics) metricIDs, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
if err != nil { if err != nil {
if err == errFallbackToMetricNameMatch { if err == errFallbackToMetricNameMatch {
// Skip tag filters requiring to scan for too many metrics. // Skip tag filters requiring to scan for too many metrics.
@ -2394,7 +2394,7 @@ const (
var uselessTagFilterCacheValue = []byte("1") var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (*uint64set.Set, error) { func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, error) {
if tf.isNegative { if tf.isNegative {
logger.Panicf("BUG: isNegative must be false") logger.Panicf("BUG: isNegative must be false")
} }
@ -2412,7 +2412,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int) (
// Slow path - scan for all the rows with the given prefix. // Slow path - scan for all the rows with the given prefix.
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
if err := is.getMetricIDsForTagFilterSlow(tf, nil, maxLoops, metricIDs.Add); err != nil { if err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, metricIDs.Add); err != nil {
if err == errFallbackToMetricNameMatch { if err == errFallbackToMetricNameMatch {
return nil, err return nil, err
} }
@ -2847,7 +2847,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
tfwsRemaining = append(tfwsRemaining, tfw) tfwsRemaining = append(tfwsRemaining, tfw)
continue continue
} }
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics) m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, nil, tfs.commonPrefix, maxDateMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2903,7 +2903,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
tfsPostponed = append(tfsPostponed, tf) tfsPostponed = append(tfsPostponed, tf)
continue continue
} }
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, tfs.commonPrefix, maxDateMetrics) m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -3085,7 +3085,8 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil return true, nil
} }
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64,
filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
// Augument tag filter prefix for per-date search instead of global search. // Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) { if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
@ -3098,7 +3099,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
tfNew.isNegative = false // isNegative for the original tf is handled by the caller. tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B tfNew.prefix = kb.B
startTime := time.Now() startTime := time.Now()
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics) metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
kbPool.Put(kb) kbPool.Put(kb)
currentTimestamp := fasttime.UnixTimestamp() currentTimestamp := fasttime.UnixTimestamp()
if currentTimestamp > lastQueryTimestamp+5 { if currentTimestamp > lastQueryTimestamp+5 {