mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: prevent from running identical heavy tag filters in concurrent queries when measuring the number of loops for such tag filter.
This should reduce CPU usage spikes when measuring the number of loops needed for heavy tag filters
This commit is contained in:
parent
fd41f070db
commit
628b8eb55e
1 changed files with 27 additions and 17 deletions
|
@ -2821,10 +2821,20 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
for i := range tfs.tfs {
|
for i := range tfs.tfs {
|
||||||
tf := &tfs.tfs[i]
|
tf := &tfs.tfs[i]
|
||||||
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
||||||
|
origLoopsCount := loopsCount
|
||||||
if currentTime > lastQueryTimestamp+60*60 {
|
if currentTime > lastQueryTimestamp+60*60 {
|
||||||
// Reset loopsCount to 0 every hour for collecting updated stats for the tf.
|
// Reset loopsCount to 0 every hour for collecting updated stats for the tf.
|
||||||
loopsCount = 0
|
loopsCount = 0
|
||||||
}
|
}
|
||||||
|
if loopsCount == 0 {
|
||||||
|
// Prevent from possible thundering herd issue when heavy tf is executed from multiple concurrent queries
|
||||||
|
// by temporary persisting its position in the tag filters list.
|
||||||
|
if origLoopsCount == 0 {
|
||||||
|
origLoopsCount = 10e6
|
||||||
|
}
|
||||||
|
lastQueryTimestamp = 0
|
||||||
|
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, lastQueryTimestamp)
|
||||||
|
}
|
||||||
tfws[i] = tagFilterWithWeight{
|
tfws[i] = tagFilterWithWeight{
|
||||||
tf: tf,
|
tf: tf,
|
||||||
loopsCount: loopsCount,
|
loopsCount: loopsCount,
|
||||||
|
@ -2851,7 +2861,8 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, nil, tfs.commonPrefix, maxDateMetrics)
|
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
|
||||||
|
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -2905,9 +2916,12 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
// It should be faster performing metricName match on the remaining filters
|
// It should be faster performing metricName match on the remaining filters
|
||||||
// instead of scanning big number of entries in the inverted index for these filters.
|
// instead of scanning big number of entries in the inverted index for these filters.
|
||||||
tfsPostponed = append(tfsPostponed, tf)
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
|
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
|
||||||
|
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount, tfw.lastQueryTimestamp)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m, err := is.getMetricIDsForDateTagFilter(tf, tfw.lastQueryTimestamp, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
|
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
|
||||||
|
is.storeLoopsCountForDateFilter(date, tf, loopsCount, tfw.lastQueryTimestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -3083,8 +3097,7 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTimestamp, date uint64,
|
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) {
|
||||||
filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, error) {
|
|
||||||
// Augument tag filter prefix for per-date search instead of global search.
|
// Augument tag filter prefix for per-date search instead of global search.
|
||||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||||
|
@ -3098,25 +3111,16 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
|
||||||
tfNew.prefix = kb.B
|
tfNew.prefix = kb.B
|
||||||
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
|
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
|
||||||
kbPool.Put(kb)
|
kbPool.Put(kb)
|
||||||
currentTimestamp := fasttime.UnixTimestamp()
|
|
||||||
if currentTimestamp > lastQueryTimestamp+5 {
|
|
||||||
// The cache already contains quite fresh entry for the current (date, tf).
|
|
||||||
// Do not update it too frequently.
|
|
||||||
return metricIDs, err
|
|
||||||
}
|
|
||||||
// Store the loopsCount for tag filter in the cache in order to sort tag filters
|
|
||||||
// in ascending durations on the next search.
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
|
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
|
||||||
loopsCount = 1 << 30
|
loopsCount = 1e9
|
||||||
}
|
}
|
||||||
if metricIDs.Len() >= maxMetrics {
|
if metricIDs.Len() >= maxMetrics {
|
||||||
// Increase loopsCount for tag filter matching too many metrics,
|
// Increase loopsCount for tag filter matching too many metrics,
|
||||||
// So next time it is moved to the end of filter list.
|
// So next time it is moved to the end of filter list.
|
||||||
loopsCount *= 2
|
loopsCount *= 2
|
||||||
}
|
}
|
||||||
is.storeLoopsCountAndTimestampForDateFilter(date, tf, loopsCount, currentTimestamp)
|
return metricIDs, loopsCount, err
|
||||||
return metricIDs, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
|
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
|
||||||
|
@ -3132,11 +3136,17 @@ func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *t
|
||||||
return loopsCount, timestamp
|
return loopsCount, timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) storeLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter, loopsCount, timestamp uint64) {
|
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, prevTimestamp uint64) {
|
||||||
|
currentTimestamp := fasttime.UnixTimestamp()
|
||||||
|
if currentTimestamp < prevTimestamp+5 {
|
||||||
|
// The cache already contains quite fresh entry for the current (date, tf).
|
||||||
|
// Do not update it too frequently.
|
||||||
|
return
|
||||||
|
}
|
||||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
kb := kbPool.Get()
|
kb := kbPool.Get()
|
||||||
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
|
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, timestamp)
|
kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
|
||||||
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||||
kbPool.Put(kb)
|
kbPool.Put(kb)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue