From 31e341371bd4d4add0db1d84032e969d7d482619 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 22 Sep 2020 00:36:43 +0300 Subject: [PATCH] lib/storage: code prettifying after be5e1222f30e82f62230db07d4a9f76d538f5931 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/781 --- lib/storage/index_db.go | 83 +++++++++++++++++++---------------------- 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 8a150f3ea..099e9d576 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -1962,7 +1962,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet continue } - metricIDs, err := is.getMetricIDsForTagFilter(tf, maxMetrics, nil) + metricIDs, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics) if err != nil { if err == errFallbackToMetricNameMatch { // Skip tag filters requiring to scan for too many metrics. @@ -2225,7 +2225,7 @@ const ( var uselessTagFilterCacheValue = []byte("1") -func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } @@ -2243,11 +2243,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, f // Slow path - scan for all the rows with the given prefix. maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric - err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool { - metricIDs.Add(metricID) - return metricIDs.Len() < maxMetrics - }) - if err != nil { + if err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, metricIDs.Add); err != nil { if err == errFallbackToMetricNameMatch { return nil, err } @@ -2256,7 +2252,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, f return metricIDs, nil } -func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, filter *uint64set.Set, f func(metricID uint64) bool) error { +func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoops int, f func(metricID uint64)) error { if len(tf.orSuffixes) > 0 { logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes) } @@ -2293,24 +2289,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, if err := mp.InitOnlyTail(item, tail); err != nil { return err } - mp.ParseMetricIDs() - - // Fast path: use the filter to skip items that don't need to be matched. - if filter != nil { - hasCommonMetric := false - for _, metricID := range mp.MetricIDs { - if filter.Has(metricID) { - hasCommonMetric = true - break - } - } - // For both complement and intersection calculations, the result and filter need to have elements in common. - if !hasCommonMetric { - continue - } - } - if prevMatch && string(suffix) == string(prevMatchingSuffix) { // Fast path: the same tag value found. // There is no need in checking it again with potentially @@ -2320,12 +2299,18 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, return errFallbackToMetricNameMatch } for _, metricID := range mp.MetricIDs { - if !f(metricID) { - return nil + if filter != nil && !filter.Has(metricID) { + continue } + f(metricID) } continue } + if filter != nil && !mp.HasCommonMetricIDs(filter) { + // Faster path: there is no need in calling tf.matchSuffix, + // since the current row has no matching metricIDs. + continue + } // Slow path: need tf.matchSuffix call. ok, err := tf.matchSuffix(suffix) @@ -2358,9 +2343,10 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, maxLoops int, return errFallbackToMetricNameMatch } for _, metricID := range mp.MetricIDs { - if !f(metricID) { - return nil + if filter != nil && !filter.Has(metricID) { + continue } + f(metricID) } } if err := ts.Error(); err != nil { @@ -2700,7 +2686,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter tfsRemainingWithCount = append(tfsRemainingWithCount, tfsWithCount[i]) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, nil) + m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, nil, maxDateMetrics) if err != nil { return nil, err } @@ -2751,7 +2737,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter break } tf := tfWithCount.tf - m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, maxDateMetrics, metricIDs) + m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, metricIDs, maxDateMetrics) if err != nil { return nil, err } @@ -2937,7 +2923,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } -func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, filter *uint64set.Set) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, error) { // Augument tag filter prefix for per-date search instead of global search. if !bytes.HasPrefix(tf.prefix, commonPrefix) { logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) @@ -2951,21 +2937,21 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, tfNew := *tf tfNew.isNegative = false // isNegative for the original tf is handled by the caller. tfNew.prefix = kb.B - metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics, filter) - + metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics) + if filter != nil { + // Do not cache the number of matching metricIDs, + // since this number may be modified by filter. + return metricIDs, err + } // Store the number of matching metricIDs in the cache in order to sort tag filters // in ascending number of matching metricIDs on the next search. is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) - - var metricIDsLen uint64 - if err == nil { - metricIDsLen = uint64(metricIDs.Len()) - } else { + metricIDsLen := uint64(metricIDs.Len()) + if err != nil { // Set metricIDsLen to maxMetrics, so the given entry will be moved to the end // of tag filters on the next search. metricIDsLen = uint64(maxMetrics) } - kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen) is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B) return metricIDs, err @@ -3098,16 +3084,13 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil // Slow path - scan for all the rows with the given prefix. maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric - err := is.getMetricIDsForTagFilterSlow(tf, maxLoops, filter, func(metricID uint64) bool { + err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, func(metricID uint64) { if tf.isNegative { // filter must be equal to metricIDs metricIDs.Del(metricID) - return true - } - if filter.Has(metricID) { + } else { metricIDs.Add(metricID) } - return true }) if err != nil { if err == errFallbackToMetricNameMatch { @@ -3298,6 +3281,16 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() { } } +// HasCommonMetricIDs returns true if mp has at least one common metric id with filter. +func (mp *tagToMetricIDsRowParser) HasCommonMetricIDs(filter *uint64set.Set) bool { + for _, metricID := range mp.MetricIDs { + if filter.Has(metricID) { + return true + } + } + return false +} + // IsDeletedTag verifies whether the tag from mp is deleted according to dmis. // // dmis must contain deleted MetricIDs.