lib/storage: tune per-day index search

This commit is contained in:
Aliaksandr Valialkin 2021-03-15 13:31:55 +02:00
parent 6b9bba7448
commit 1c26020080

View file

@ -2073,15 +2073,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
if err != nil {
if err == errFallbackToMetricNameMatch {
// Skip tag filters requiring to scan for too many metrics.
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
uselessTagFilters++
continue
}
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
}
if metricIDs.Len() >= maxMetrics {
@ -2331,7 +2322,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Fast path: found metricIDs by date range.
return nil
}
if err != errFallbackToMetricNameMatch {
if err != errFallbackToGlobalSearch {
return err
}
@ -2355,12 +2346,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
continue
}
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
// The tag filter requires too many index scans. Postpone it,
// so tag filters with lower number of index scans may be applied.
tfsPostponed = append(tfsPostponed, tf)
continue
}
if err != nil {
return err
}
@ -2370,11 +2355,8 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
if len(tfsPostponed) > 0 && successfulIntersects == 0 {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed)
}
for i, tf := range tfsPostponed {
for _, tf := range tfsPostponed {
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed[i:])
}
if err != nil {
return err
}
@ -2388,7 +2370,6 @@ const (
uselessSingleTagFilterKeyPrefix = 0
uselessMultiTagFiltersKeyPrefix = 1
uselessNegativeTagFilterKeyPrefix = 2
uselessTagIntersectKeyPrefix = 3
)
var uselessTagFilterCacheValue = []byte("1")
@ -2402,27 +2383,20 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, loopsCount, err
}
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}
// Slow path - scan for all the rows with the given prefix.
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add)
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, loopsCount, err
}
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoopsCount uint64, f func(metricID uint64)) (uint64, error) {
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64)) (uint64, error) {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2461,9 +2435,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
}
mp.ParseMetricIDs()
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
@ -2566,7 +2537,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
@ -2585,9 +2555,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return loopsCount, err
}
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
metricIDs.AddMulti(mp.MetricIDs)
}
@ -2606,8 +2573,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoopsCount := uint64(len(sortedFilter)) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
@ -2638,10 +2603,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
return nil
}
sf = sortedFilter
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID = range mp.MetricIDs {
if len(sf) == 0 {
@ -2685,7 +2646,7 @@ func binarySearchUint64(a []uint64, v uint64) uint {
return i
}
var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMetricNameMatch because of too many index scan loops")
var errFallbackToGlobalSearch = errors.New("fall back from per-day index search to global index search")
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
@ -2754,7 +2715,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
if maxDate-minDate > maxDaysForDateMetricIDs {
// Too much dates must be covered. Give up, since it may be slow.
return errFallbackToMetricNameMatch
return errFallbackToGlobalSearch
}
if minDate == maxDate {
// Fast path - query only a single date.
@ -2784,14 +2745,14 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
return
}
if err != nil {
if err == errFallbackToMetricNameMatch {
if err == errFallbackToGlobalSearch {
// The per-date search is too expensive. Probably it is faster to perform global search
// using metric name match.
errGlobal = err
return
}
dateStr := time.Unix(int64(date*24*3600), 0)
errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %w", dateStr, err)
errGlobal = fmt.Errorf("cannot search for metricIDs at %s: %w", dateStr, err)
return
}
if metricIDs.Len() < maxMetrics {
@ -2824,7 +2785,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
origLoopsCount := loopsCount
if loopsCount == 0 && tf.looksLikeHeavy() {
// Set high loopsCount for heavy tag filters instead of spending CPU time on their execution.
loopsCount = 100e6
loopsCount = 11e6
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
if currentTime > lastQueryTimestamp+3600 {
@ -2838,7 +2799,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Prevent from possible thundering herd issue when potentially heavy tf is executed from multiple concurrent queries
// by temporary persisting its position in the tag filters list.
if origLoopsCount == 0 {
origLoopsCount = 50e6
origLoopsCount = 9e6
}
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount)
}
@ -2859,7 +2820,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set
tfwsRemaining := tfws[:0]
maxDateMetrics := maxMetrics * 50
maxDateMetrics := maxMetrics * 100
for i := range tfws {
tfw := tfws[i]
tf := tfw.tf
@ -2870,10 +2831,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
if err != nil {
if err == errFallbackToMetricNameMatch {
tfsPostponed = append(tfsPostponed, tf)
continue
}
return nil, err
}
if m.Len() >= maxDateMetrics {
@ -2903,7 +2860,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
if m.Len() >= maxDateMetrics {
// Too many time series found for the given (date). Fall back to global search.
return nil, errFallbackToMetricNameMatch
return nil, errFallbackToGlobalSearch
}
metricIDs = m
}
@ -2933,10 +2890,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
if err != nil {
if err == errFallbackToMetricNameMatch {
tfsPostponed = append(tfsPostponed, tf)
continue
}
return nil, err
}
if m.Len() >= maxDateMetrics {
@ -3127,7 +3080,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
kbPool.Put(kb)
if err != nil {
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
loopsCount = 1e9
loopsCount = 20e9
}
if metricIDs.Len() >= maxMetrics {
// Increase loopsCount for tag filter matching too many metrics,
@ -3235,31 +3188,6 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
if filter.Len() == 0 {
return nil, nil
}
kb := &is.kb
filterLenRounded := (uint64(filter.Len()) / 1024) * 1024
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work, since the intersection will return
// errFallbackToMetricNameMatc for the given filter.
return nil, errFallbackToMetricNameMatch
}
metricIDs, err := is.intersectMetricIDsWithTagFilterNocache(tf, filter)
if err == nil {
return metricIDs, err
}
if err != errFallbackToMetricNameMatch {
return nil, err
}
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B, is.accountID, is.projectID)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
return nil, errFallbackToMetricNameMatch
}
func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
metricIDs := filter
if !tf.isNegative {
metricIDs = &uint64set.Set{}
@ -3267,17 +3195,13 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
}
// Slow path - scan for all the rows with the given prefix.
maxLoopsCount := uint64(filter.Len()) * maxIndexScanLoopsPerMetric
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) {
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, func(metricID uint64) {
if tf.isNegative {
// filter must be equal to metricIDs
metricIDs.Del(metricID)
@ -3286,9 +3210,6 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
}
})
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil