diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d667ac75a9..792a8e1165 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -2821,10 +2821,20 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter for i := range tfs.tfs { tf := &tfs.tfs[i] loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf) + origLoopsCount := loopsCount if currentTime > lastQueryTimestamp+60*60 { // Reset loopsCount to 0 every hour for collecting updated stats for the tf. loopsCount = 0 } + if loopsCount == 0 { + // Prevent from possible thundering herd issue when heavy tf is executed from multiple concurrent queries + // by temporary persisting its position in the tag filters list. + if origLoopsCount == 0 { + origLoopsCount = 10e6 + } + lastQueryTimestamp = 0 + is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, lastQueryTimestamp) + } tfws[i] = tagFilterWithWeight{ tf: tf, loopsCount: loopsCount, @@ -2851,7 +2861,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter tfwsRemaining = append(tfwsRemaining, tfw) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, nil, tfs.commonPrefix, maxDateMetrics) + m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics) + is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp) if err != nil { return nil, err } @@ -2905,9 +2916,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter // It should be faster performing metricName match on the remaining filters // instead of scanning big number of entries in the inverted index for these filters. tfsPostponed = append(tfsPostponed, tf) + // Store stats for non-executed tf, since it could be updated during protection from thundered herd. + is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount, tfw.lastQueryTimestamp) continue } - m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, metricIDs, tfs.commonPrefix, maxDateMetrics) + m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics) + is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp) if err != nil { return nil, err } @@ -3083,8 +3097,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) { return true, nil } -func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64, - filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) { +func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) { // Augument tag filter prefix for per-date search instead of global search. if !bytes.HasPrefix(tf.prefix, commonPrefix) { logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix) @@ -3098,25 +3111,16 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime tfNew.prefix = kb.B metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics) kbPool.Put(kb) - currentTimestamp := fasttime.UnixTimestamp() - if currentTimestamp > lastQueryTimestamp+5 { - // The cache already contains quite fresh entry for the current (date, tf). - // Do not update it too frequently. - return metricIDs, err - } - // Store the loopsCount for tag filter in the cache in order to sort tag filters - // in ascending durations on the next search. if err != nil { // Set high loopsCount for failing filter, so it is moved to the end of filter list. - loopsCount = 1 << 30 + loopsCount = 1e9 } if metricIDs.Len() >= maxMetrics { // Increase loopsCount for tag filter matching too many metrics, // So next time it is moved to the end of filter list. loopsCount *= 2 } - is.storeLoopsCountAndTimestampForDateFilter(date, tf, loopsCount, currentTimestamp) - return metricIDs, err + return metricIDs, loopsCount, err } func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) { @@ -3132,11 +3136,17 @@ func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *t return loopsCount, timestamp } -func (is *indexSearch) storeLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter, loopsCount, timestamp uint64) { +func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, prevTimestamp uint64) { + currentTimestamp := fasttime.UnixTimestamp() + if currentTimestamp < prevTimestamp+5 { + // The cache already contains quite fresh entry for the current (date, tf). + // Do not update it too frequently. + return + } is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID) kb := kbPool.Get() kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount) - kb.B = encoding.MarshalUint64(kb.B, timestamp) + kb.B = encoding.MarshalUint64(kb.B, currentTimestamp) is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B) kbPool.Put(kb) }