diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 2e4fdc8d89..d667ac75a9 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "math" "path/filepath" "sort" "sync" @@ -2072,7 +2071,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet continue } - metricIDs, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics) + metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics) if err != nil { if err == errFallbackToMetricNameMatch { // Skip tag filters requiring to scan for too many metrics. @@ -2394,34 +2393,36 @@ const ( var uselessTagFilterCacheValue = []byte("1") -func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, uint64, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } metricIDs := &uint64set.Set{} if len(tf.orSuffixes) > 0 { // Fast path for orSuffixes - seek for rows for each value from orSuffixes. - if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil { + loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs) + if err != nil { if err == errFallbackToMetricNameMatch { - return nil, err + return nil, loopsCount, err } - return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) + return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf) } - return metricIDs, nil + return metricIDs, loopsCount, nil } // Slow path - scan for all the rows with the given prefix. - maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric - if err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, metricIDs.Add); err != nil { + maxLoopsCount := uint64(maxMetrics) * maxIndexScanSlowLoopsPerMetric + loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add) + if err != nil { if err == errFallbackToMetricNameMatch { - return nil, err + return nil, loopsCount, err } - return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf) + return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf) } - return metricIDs, nil + return metricIDs, loopsCount, nil } -func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoops int, f func(metricID uint64)) error { +func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoopsCount uint64, f func(metricID uint64)) (uint64, error) { if len(tf.orSuffixes) > 0 { logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes) } @@ -2433,40 +2434,40 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 mp.Reset() var prevMatchingSuffix []byte var prevMatch bool - loops := 0 + var loopsCount uint64 loopsPaceLimiter := 0 prefix := tf.prefix ts.Seek(prefix) for ts.NextItem() { if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 { if err := checkSearchDeadlineAndPace(is.deadline); err != nil { - return err + return loopsCount, err } } loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { - return nil + return loopsCount, nil } tail := item[len(prefix):] n := bytes.IndexByte(tail, tagSeparatorChar) if n < 0 { - return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar) + return loopsCount, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar) } suffix := tail[:n+1] tail = tail[n+1:] if err := mp.InitOnlyTail(item, tail); err != nil { - return err + return loopsCount, err } mp.ParseMetricIDs() + loopsCount += uint64(mp.MetricIDsLen()) + if loopsCount > maxLoopsCount { + return loopsCount, errFallbackToMetricNameMatch + } if prevMatch && string(suffix) == string(prevMatchingSuffix) { // Fast path: the same tag value found. // There is no need in checking it again with potentially // slow tf.matchSuffix, which may call regexp. - loops += mp.MetricIDsLen() - if loops > maxLoops { - return errFallbackToMetricNameMatch - } for _, metricID := range mp.MetricIDs { if filter != nil && !filter.Has(metricID) { continue @@ -2480,11 +2481,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 // since the current row has no matching metricIDs. continue } - // Slow path: need tf.matchSuffix call. ok, err := tf.matchSuffix(suffix) + loopsCount += reMatchCost if err != nil { - return fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err) + return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err) } if !ok { prevMatch = false @@ -2499,18 +2500,16 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 // The last char in kb.B must be tagSeparatorChar. Just increment it // in order to jump to the next tag value. if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff { - return fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar) + return loopsCount, fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar) } kb.B[len(kb.B)-1]++ ts.Seek(kb.B) + // Assume that a seek cost is equivalent to 100 ordinary loops. + loopsCount += 100 continue } prevMatch = true prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...) - loops += mp.MetricIDsLen() - if loops > maxLoops { - return errFallbackToMetricNameMatch - } for _, metricID := range mp.MetricIDs { if filter != nil && !filter.Has(metricID) { continue @@ -2519,29 +2518,32 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6 } } if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) + return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) } - return nil + return loopsCount, nil } -func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error { +func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) { if tf.isNegative { logger.Panicf("BUG: isNegative must be false") } kb := kbPool.Get() defer kbPool.Put(kb) + var loopsCount uint64 for _, orSuffix := range tf.orSuffixes { kb.B = append(kb.B[:0], tf.prefix...) kb.B = append(kb.B, orSuffix...) kb.B = append(kb.B, tagSeparatorChar) - if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil { - return err + lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs) + if err != nil { + return loopsCount, err } + loopsCount += lc if metricIDs.Len() >= maxMetrics { - return nil + return loopsCount, nil } } - return nil + return loopsCount, nil } func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error { @@ -2559,39 +2561,39 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met return nil } -func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error { +func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) { ts := &is.ts mp := &is.mp mp.Reset() - maxLoops := maxMetrics * maxIndexScanLoopsPerMetric - loops := 0 + maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric + var loopsCount uint64 loopsPaceLimiter := 0 ts.Seek(prefix) for metricIDs.Len() < maxMetrics && ts.NextItem() { if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 { if err := checkSearchDeadlineAndPace(is.deadline); err != nil { - return err + return loopsCount, err } } loopsPaceLimiter++ item := ts.Item if !bytes.HasPrefix(item, prefix) { - return nil + return loopsCount, nil } if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil { - return err + return loopsCount, err } - loops += mp.MetricIDsLen() - if loops > maxLoops { - return errFallbackToMetricNameMatch + loopsCount += uint64(mp.MetricIDsLen()) + if loopsCount > maxLoopsCount { + return loopsCount, errFallbackToMetricNameMatch } mp.ParseMetricIDs() metricIDs.AddMulti(mp.MetricIDs) } if err := ts.Error(); err != nil { - return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) + return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err) } - return nil + return loopsCount, nil } func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error { @@ -2603,8 +2605,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri ts := &is.ts mp := &is.mp mp.Reset() - maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric - loops := 0 + maxLoopsCount := uint64(len(sortedFilter)) * maxIndexScanLoopsPerMetric + var loopsCount uint64 loopsPaceLimiter := 0 ts.Seek(prefix) var sf []uint64 @@ -2635,8 +2637,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri return nil } sf = sortedFilter - loops += mp.MetricIDsLen() - if loops > maxLoops { + loopsCount += uint64(mp.MetricIDsLen()) + if loopsCount > maxLoopsCount { return errFallbackToMetricNameMatch } mp.ParseMetricIDs() @@ -2806,31 +2808,33 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set } func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { - // Sort tfs by the duration from previous queries. + // Sort tfs by loopsCount needed for performing each filter. + // This stats is usually collected from the previous queries. // This way we limit the amount of work below by applying fast filters at first. type tagFilterWithWeight struct { tf *tagFilter - durationSeconds float64 + loopsCount uint64 lastQueryTimestamp uint64 } tfws := make([]tagFilterWithWeight, len(tfs.tfs)) + currentTime := fasttime.UnixTimestamp() for i := range tfs.tfs { tf := &tfs.tfs[i] - durationSeconds, lastQueryTimestamp := is.getDurationAndTimestampForDateFilter(date, tf) - if durationSeconds == 0 { - // Assume that unknown tag filters can take quite big amounts of time. - durationSeconds = 1.0 + loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf) + if currentTime > lastQueryTimestamp+60*60 { + // Reset loopsCount to 0 every hour for collecting updated stats for the tf. + loopsCount = 0 } tfws[i] = tagFilterWithWeight{ tf: tf, - durationSeconds: durationSeconds, + loopsCount: loopsCount, lastQueryTimestamp: lastQueryTimestamp, } } sort.Slice(tfws, func(i, j int) bool { a, b := &tfws[i], &tfws[j] - if a.durationSeconds != b.durationSeconds { - return a.durationSeconds < b.durationSeconds + if a.loopsCount != b.loopsCount { + return a.loopsCount < b.loopsCount } return a.tf.Less(b.tf) }) @@ -2897,7 +2901,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // Short circuit - there is no need in applying the remaining filters to an empty set. break } - if float64(metricIDsLen)/metricNameMatchesPerSecond < tfw.durationSeconds { + if uint64(metricIDsLen)*maxIndexScanLoopsPerMetric < tfw.loopsCount { // It should be faster performing metricName match on the remaining filters // instead of scanning big number of entries in the inverted index for these filters. tfsPostponed = append(tfsPostponed, tf) @@ -2933,12 +2937,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter return metricIDs, nil } -// The estimated number of per-second loops inside updateMetricIDsByMetricNameMatch -// -// This value is used for determining when matching by metric name must be perfromed instead of matching -// by the remaining tag filters. -const metricNameMatchesPerSecond = 50000 - func (is *indexSearch) storeDateMetricID(date, metricID uint64) error { ii := getIndexItems() defer putIndexItems(ii) @@ -3098,8 +3096,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime tfNew := *tf tfNew.isNegative = false // isNegative for the original tf is handled by the caller. tfNew.prefix = kb.B - startTime := time.Now() - metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics) + metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics) kbPool.Put(kb) currentTimestamp := fasttime.UnixTimestamp() if currentTimestamp > lastQueryTimestamp+5 { @@ -3107,23 +3104,22 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime // Do not update it too frequently. return metricIDs, err } - // Store the duration for tag filter execution in the cache in order to sort tag filters + // Store the loopsCount for tag filter in the cache in order to sort tag filters // in ascending durations on the next search. - durationSeconds := time.Since(startTime).Seconds() - if err != nil && durationSeconds < 10 { - // Set high duration for failing filter, so it is moved to the end of filter list. - durationSeconds = 10 + if err != nil { + // Set high loopsCount for failing filter, so it is moved to the end of filter list. + loopsCount = 1 << 30 } if metricIDs.Len() >= maxMetrics { - // Increase the duration for tag filter matching too many metrics, - // So next time it will be applied after filters matching lower number of metrics. - durationSeconds *= 2 + // Increase loopsCount for tag filter matching too many metrics, + // So next time it is moved to the end of filter list. + loopsCount *= 2 } - is.storeDurationAndTimestampForDateFilter(date, tf, durationSeconds, currentTimestamp) + is.storeLoopsCountAndTimestampForDateFilter(date, tf, loopsCount, currentTimestamp) return metricIDs, err } -func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tagFilter) (float64, uint64) { +func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) { is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) kb := kbPool.Get() defer kbPool.Put(kb) @@ -3131,17 +3127,15 @@ func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tag if len(kb.B) != 16 { return 0, 0 } - n := encoding.UnmarshalUint64(kb.B) - durationSeconds := math.Float64frombits(n) + loopsCount := encoding.UnmarshalUint64(kb.B) timestamp := encoding.UnmarshalUint64(kb.B[8:]) - return durationSeconds, timestamp + return loopsCount, timestamp } -func (is *indexSearch) storeDurationAndTimestampForDateFilter(date uint64, tf *tagFilter, durationSeconds float64, timestamp uint64) { +func (is *indexSearch) storeLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter, loopsCount, timestamp uint64) { is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) - n := math.Float64bits(durationSeconds) kb := kbPool.Get() - kb.B = encoding.MarshalUint64(kb.B[:0], n) + kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount) kb.B = encoding.MarshalUint64(kb.B, timestamp) is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B) kbPool.Put(kb) @@ -3268,8 +3262,8 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil } // Slow path - scan for all the rows with the given prefix. - maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric - err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, func(metricID uint64) { + maxLoopsCount := uint64(filter.Len()) * maxIndexScanSlowLoopsPerMetric + _, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) { if tf.isNegative { // filter must be equal to metricIDs metricIDs.Del(metricID) diff --git a/lib/storage/table.go b/lib/storage/table.go index 646d330742..5049a18e0f 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -203,7 +203,7 @@ func (tb *table) MustClose() { if n := atomic.LoadUint64(&ptw.refCount); n != 1 { logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n) } - ptw.pt.MustClose() + ptw.decRef() } // Release exclusive lock on the table. diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 5f1079a079..4ca35b3b42 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -233,7 +233,9 @@ type tagFilter struct { value []byte isNegative bool isRegexp bool - matchCost uint64 + + // matchCost is a cost for matching a filter against a single string. + matchCost uint64 // Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}. // Additionally it contains: