lib/storage: tune per-day index search

This commit is contained in:
Aliaksandr Valialkin 2021-03-15 13:31:55 +02:00
parent 2dae0a2c47
commit 3ccf7ea20c

View file

@ -2048,15 +2048,6 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
if err != nil {
if err == errFallbackToMetricNameMatch {
// Skip tag filters requiring to scan for too many metrics.
kb.B = append(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics))
kb.B = tf.Marshal(kb.B)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
uselessTagFilters++
continue
}
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
}
if metricIDs.Len() >= maxMetrics {
@ -2306,7 +2297,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Fast path: found metricIDs by date range.
return nil
}
if err != errFallbackToMetricNameMatch {
if err != errFallbackToGlobalSearch {
return err
}
@ -2330,12 +2321,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
continue
}
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
// The tag filter requires too many index scans. Postpone it,
// so tag filters with lower number of index scans may be applied.
tfsPostponed = append(tfsPostponed, tf)
continue
}
if err != nil {
return err
}
@ -2345,11 +2330,8 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
if len(tfsPostponed) > 0 && successfulIntersects == 0 {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed)
}
for i, tf := range tfsPostponed {
for _, tf := range tfsPostponed {
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err == errFallbackToMetricNameMatch {
return is.updateMetricIDsByMetricNameMatch(metricIDs, minMetricIDs, tfsPostponed[i:])
}
if err != nil {
return err
}
@ -2363,7 +2345,6 @@ const (
uselessSingleTagFilterKeyPrefix = 0
uselessMultiTagFiltersKeyPrefix = 1
uselessNegativeTagFilterKeyPrefix = 2
uselessTagIntersectKeyPrefix = 3
)
var uselessTagFilterCacheValue = []byte("1")
@ -2377,27 +2358,20 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, loopsCount, err
}
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}
// Slow path - scan for all the rows with the given prefix.
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add)
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add)
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, loopsCount, err
}
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoopsCount uint64, f func(metricID uint64)) (uint64, error) {
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64)) (uint64, error) {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2436,9 +2410,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
}
mp.ParseMetricIDs()
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
@ -2541,7 +2512,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
@ -2560,9 +2530,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return loopsCount, err
}
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
metricIDs.AddMulti(mp.MetricIDs)
}
@ -2581,8 +2548,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
ts := &is.ts
mp := &is.mp
mp.Reset()
maxLoopsCount := uint64(len(sortedFilter)) * maxIndexScanLoopsPerMetric
var loopsCount uint64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
@ -2613,10 +2578,6 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
return nil
}
sf = sortedFilter
loopsCount += uint64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return errFallbackToMetricNameMatch
}
mp.ParseMetricIDs()
for _, metricID = range mp.MetricIDs {
if len(sf) == 0 {
@ -2660,7 +2621,7 @@ func binarySearchUint64(a []uint64, v uint64) uint {
return i
}
var errFallbackToMetricNameMatch = errors.New("fall back to updateMetricIDsByMetricNameMatch because of too many index scan loops")
var errFallbackToGlobalSearch = errors.New("fall back from per-day index search to global index search")
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
@ -2729,7 +2690,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
}
if maxDate-minDate > maxDaysForDateMetricIDs {
// Too much dates must be covered. Give up, since it may be slow.
return errFallbackToMetricNameMatch
return errFallbackToGlobalSearch
}
if minDate == maxDate {
// Fast path - query only a single date.
@ -2759,14 +2720,14 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
return
}
if err != nil {
if err == errFallbackToMetricNameMatch {
if err == errFallbackToGlobalSearch {
// The per-date search is too expensive. Probably it is faster to perform global search
// using metric name match.
errGlobal = err
return
}
dateStr := time.Unix(int64(date*24*3600), 0)
errGlobal = fmt.Errorf("cannot search for metricIDs for %s: %w", dateStr, err)
errGlobal = fmt.Errorf("cannot search for metricIDs at %s: %w", dateStr, err)
return
}
if metricIDs.Len() < maxMetrics {
@ -2799,7 +2760,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
origLoopsCount := loopsCount
if loopsCount == 0 && tf.looksLikeHeavy() {
// Set high loopsCount for heavy tag filters instead of spending CPU time on their execution.
loopsCount = 100e6
loopsCount = 11e6
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
if currentTime > lastQueryTimestamp+3600 {
@ -2813,7 +2774,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// Prevent from possible thundering herd issue when potentially heavy tf is executed from multiple concurrent queries
// by temporary persisting its position in the tag filters list.
if origLoopsCount == 0 {
origLoopsCount = 50e6
origLoopsCount = 9e6
}
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount)
}
@ -2834,7 +2795,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
var tfsPostponed []*tagFilter
var metricIDs *uint64set.Set
tfwsRemaining := tfws[:0]
maxDateMetrics := maxMetrics * 50
maxDateMetrics := maxMetrics * 100
for i := range tfws {
tfw := tfws[i]
tf := tfw.tf
@ -2845,10 +2806,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
if err != nil {
if err == errFallbackToMetricNameMatch {
tfsPostponed = append(tfsPostponed, tf)
continue
}
return nil, err
}
if m.Len() >= maxDateMetrics {
@ -2878,7 +2835,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
if m.Len() >= maxDateMetrics {
// Too many time series found for the given (date). Fall back to global search.
return nil, errFallbackToMetricNameMatch
return nil, errFallbackToGlobalSearch
}
metricIDs = m
}
@ -2908,10 +2865,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, maxDateMetrics)
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
if err != nil {
if err == errFallbackToMetricNameMatch {
tfsPostponed = append(tfsPostponed, tf)
continue
}
return nil, err
}
if m.Len() >= maxDateMetrics {
@ -3102,7 +3055,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
kbPool.Put(kb)
if err != nil {
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
loopsCount = 1e9
loopsCount = 20e9
}
if metricIDs.Len() >= maxMetrics {
// Increase loopsCount for tag filter matching too many metrics,
@ -3210,31 +3163,6 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
if filter.Len() == 0 {
return nil, nil
}
kb := &is.kb
filterLenRounded := (uint64(filter.Len()) / 1024) * 1024
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work, since the intersection will return
// errFallbackToMetricNameMatc for the given filter.
return nil, errFallbackToMetricNameMatch
}
metricIDs, err := is.intersectMetricIDsWithTagFilterNocache(tf, filter)
if err == nil {
return metricIDs, err
}
if err != errFallbackToMetricNameMatch {
return nil, err
}
kb.B = append(kb.B[:0], uselessTagIntersectKeyPrefix)
kb.B = encoding.MarshalUint64(kb.B, filterLenRounded)
kb.B = tf.Marshal(kb.B)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
return nil, errFallbackToMetricNameMatch
}
func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
metricIDs := filter
if !tf.isNegative {
metricIDs = &uint64set.Set{}
@ -3242,17 +3170,13 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
if err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter); err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
}
// Slow path - scan for all the rows with the given prefix.
maxLoopsCount := uint64(filter.Len()) * maxIndexScanLoopsPerMetric
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) {
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, func(metricID uint64) {
if tf.isNegative {
// filter must be equal to metricIDs
metricIDs.Del(metricID)
@ -3261,9 +3185,6 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
}
})
if err != nil {
if err == errFallbackToMetricNameMatch {
return nil, err
}
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil