lib/storage: further tuning for time series search

This commit is contained in:
Aliaksandr Valialkin 2021-03-16 18:46:22 +02:00
parent f54ece438d
commit f4a44d6c0d
2 changed files with 165 additions and 96 deletions

View file

@ -2083,7 +2083,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
continue
}
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics, int64Max)
if err != nil {
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
}
@ -2334,7 +2334,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
// Fast path: found metricIDs by date range.
return nil
}
if err != errFallbackToGlobalSearch {
if !errors.Is(err, errFallbackToGlobalSearch) {
return err
}
@ -2386,19 +2386,19 @@ const (
var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, uint64, error) {
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
var loopsCount uint64
var loopsCount int64
var err error
if filter != nil {
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, maxLoopsCount)
} else {
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount)
}
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
@ -2407,14 +2407,16 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
}
// Slow path - scan for all the rows with the given prefix.
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add)
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add, maxLoopsCount)
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, loopsCount, nil
}
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64)) (uint64, error) {
var errTooManyLoops = fmt.Errorf("too many loops is needed for applying this filter")
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64), maxLoopsCount int64) (int64, error) {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2426,7 +2428,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
mp.Reset()
var prevMatchingSuffix []byte
var prevMatch bool
var loopsCount uint64
var loopsCount int64
loopsPaceLimiter := 0
prefix := tf.prefix
ts.Seek(prefix)
@ -2452,7 +2454,10 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
return loopsCount, err
}
mp.ParseMetricIDs()
loopsCount += uint64(mp.MetricIDsLen())
loopsCount += int64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errTooManyLoops
}
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
// Fast path: the same tag value found.
// There is no need in checking it again with potentially
@ -2473,7 +2478,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
// Slow path: need tf.matchSuffix call.
ok, err := tf.matchSuffix(suffix)
// Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration.
loopsCount += 10 * tf.matchCost
loopsCount += 10 * int64(tf.matchCost)
if err != nil {
return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
}
@ -2513,18 +2518,18 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
kb := kbPool.Get()
defer kbPool.Put(kb)
var loopsCount uint64
var loopsCount int64
for _, orSuffix := range tf.orSuffixes {
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs)
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount)
if err != nil {
return loopsCount, err
}
@ -2536,16 +2541,16 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) (uint64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set, maxLoopsCount int64) (int64, error) {
sortedFilter := filter.AppendTo(nil)
kb := kbPool.Get()
defer kbPool.Put(kb)
var loopsCount uint64
var loopsCount int64
for _, orSuffix := range tf.orSuffixes {
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative)
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative, maxLoopsCount-loopsCount)
if err != nil {
return loopsCount, err
}
@ -2554,11 +2559,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
ts := &is.ts
mp := &is.mp
mp.Reset()
var loopsCount uint64
var loopsCount int64
loopsPaceLimiter := 0
ts.Seek(prefix)
for metricIDs.Len() < maxMetrics && ts.NextItem() {
@ -2575,7 +2580,10 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return loopsCount, err
}
loopsCount += uint64(mp.MetricIDsLen())
loopsCount += int64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errTooManyLoops
}
mp.ParseMetricIDs()
metricIDs.AddMulti(mp.MetricIDs)
}
@ -2585,7 +2593,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) (uint64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool, maxLoopsCount int64) (int64, error) {
if len(sortedFilter) == 0 {
return 0, nil
}
@ -2594,7 +2602,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
ts := &is.ts
mp := &is.mp
mp.Reset()
var loopsCount uint64
var loopsCount int64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
@ -2613,7 +2621,10 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return loopsCount, err
}
loopsCount += uint64(mp.MetricIDsLen())
loopsCount += int64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errTooManyLoops
}
firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs()
if lastMetricID < firstFilterMetricID {
// Skip the item, since it contains metricIDs lower
@ -2774,12 +2785,6 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
return
}
if err != nil {
if err == errFallbackToGlobalSearch {
// The per-date search is too expensive. Probably it is faster to perform global search
// using metric name match.
errGlobal = err
return
}
dateStr := time.Unix(int64(date*24*3600), 0)
errGlobal = fmt.Errorf("cannot search for metricIDs at %s: %w", dateStr, err)
return
@ -2803,26 +2808,26 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// This stats is usually collected from the previous queries.
// This way we limit the amount of work below by applying fast filters at first.
type tagFilterWithWeight struct {
tf *tagFilter
loopsCount uint64
tf *tagFilter
loopsCount int64
filterLoopsCount int64
}
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
currentTime := fasttime.UnixTimestamp()
for i := range tfs.tfs {
tf := &tfs.tfs[i]
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
loopsCount, filterLoopsCount, timestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
origLoopsCount := loopsCount
if loopsCount == 0 && tf.looksLikeHeavy() {
// Set high loopsCount for heavy tag filters instead of spending CPU time on their execution.
loopsCount = 11e6
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
if currentTime > lastQueryTimestamp+3600 {
origFilterLoopsCount := filterLoopsCount
if currentTime > timestamp+3600 {
// Update stats once per hour for relatively fast tag filters.
// There is no need in spending CPU resources on updating stats for heavy tag filters.
if loopsCount <= 10e6 {
loopsCount = 0
}
if filterLoopsCount <= 10e6 {
filterLoopsCount = 0
}
}
if loopsCount == 0 {
// Prevent from possible thundering herd issue when potentially heavy tf is executed from multiple concurrent queries
@ -2830,11 +2835,15 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
if origLoopsCount == 0 {
origLoopsCount = 9e6
}
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount)
if origFilterLoopsCount == 0 {
origFilterLoopsCount = 9e6
}
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, origFilterLoopsCount)
}
tfws[i] = tagFilterWithWeight{
tf: tf,
loopsCount: loopsCount,
tf: tf,
loopsCount: loopsCount,
filterLoopsCount: filterLoopsCount,
}
}
sort.Slice(tfws, func(i, j int) bool {
@ -2844,45 +2853,84 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
return a.tf.Less(b.tf)
})
getFirstPositiveLoopsCount := func(tfws []tagFilterWithWeight) int64 {
for i := range tfws {
if n := tfws[i].loopsCount; n > 0 {
return n
}
}
return int64Max
}
storeLoopsCount := func(tfw *tagFilterWithWeight, loopsCount int64) {
needStore := false
if loopsCount != tfw.loopsCount {
tfw.loopsCount = loopsCount
needStore = true
}
if loopsCount > tfw.filterLoopsCount {
tfw.filterLoopsCount = loopsCount
needStore = true
}
if needStore {
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, tfw.filterLoopsCount)
}
}
storeZeroLoopsCounts := func(tfws []tagFilterWithWeight) {
for _, tfw := range tfws {
if tfw.loopsCount == 0 || tfw.filterLoopsCount == 0 {
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, tfw.filterLoopsCount)
}
}
}
// Populate metricIDs for the first non-negative filter.
// Populate metricIDs for the first non-negative filter with the cost smaller than maxLoopsCount.
var metricIDs *uint64set.Set
tfwsRemaining := tfws[:0]
maxDateMetrics := maxMetrics * 50
for i := range tfws {
tfw := tfws[i]
maxDateMetrics := intMax
if maxMetrics < intMax/50 {
maxDateMetrics = maxMetrics * 50
}
for i, tfw := range tfws {
tf := tfw.tf
if tf.isNegative {
tfwsRemaining = append(tfwsRemaining, tfw)
continue
}
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
if loopsCount > tfw.loopsCount {
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
if err != nil {
if errors.Is(err, errTooManyLoops) {
// The tf took too many loops compared to the next filter. Postpone applying this filter.
storeLoopsCount(&tfw, loopsCount+1)
tfwsRemaining = append(tfwsRemaining, tfw)
continue
}
// Move failing filter to the end of filter list.
storeLoopsCount(&tfw, int64Max)
storeZeroLoopsCounts(tfws[i+1:])
storeZeroLoopsCounts(tfwsRemaining)
return nil, err
}
if m.Len() >= maxDateMetrics {
// Too many time series found by a single tag filter. Postpone applying this filter.
// Too many time series found by a single tag filter. Move the filter to the end of list.
storeLoopsCount(&tfw, int64Max-1)
tfwsRemaining = append(tfwsRemaining, tfw)
tfw.loopsCount = loopsCount
continue
}
storeLoopsCount(&tfw, loopsCount)
metricIDs = m
i++
for i < len(tfws) {
tfwsRemaining = append(tfwsRemaining, tfws[i])
i++
}
tfwsRemaining = append(tfwsRemaining, tfws[i+1:]...)
break
}
tfws = tfwsRemaining
if metricIDs == nil {
// All the filters in tfs are negative or match too many time series.
// Populate all the metricIDs for the given (date),
// so later they can be filtered out with negative filters.
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
if err != nil {
storeZeroLoopsCounts(tfws)
if err == errMissingMetricIDsForDate {
// Zero time series were written on the given date.
return nil, nil
@ -2891,11 +2939,33 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
}
if m.Len() >= maxDateMetrics {
// Too many time series found for the given (date). Fall back to global search.
storeZeroLoopsCounts(tfws)
return nil, errFallbackToGlobalSearch
}
metricIDs = m
}
sort.Slice(tfws, func(i, j int) bool {
a, b := &tfws[i], &tfws[j]
if a.filterLoopsCount != b.filterLoopsCount {
return a.filterLoopsCount < b.filterLoopsCount
}
return a.tf.Less(b.tf)
})
getFirstPositiveFilterLoopsCount := func(tfws []tagFilterWithWeight) int64 {
for i := range tfws {
if n := tfws[i].filterLoopsCount; n > 0 {
return n
}
}
return int64Max
}
storeFilterLoopsCount := func(tfw *tagFilterWithWeight, filterLoopsCount int64) {
if filterLoopsCount != tfw.filterLoopsCount {
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, filterLoopsCount)
}
}
// Intersect metricIDs with the rest of filters.
//
// Do not run these tag filters in parallel, since this may result in CPU and RAM waste
@ -2903,34 +2973,38 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// so the remaining filters could be performed via much faster metricName matching instead
// of slow selecting of matching metricIDs.
var tfsPostponed []*tagFilter
for i := range tfwsRemaining {
tfw := tfwsRemaining[i]
for i, tfw := range tfws {
tf := tfw.tf
metricIDsLen := metricIDs.Len()
if metricIDsLen == 0 {
// Short circuit - there is no need in applying the remaining filters to an empty set.
// There is no need in applying the remaining filters to an empty set.
storeZeroLoopsCounts(tfws[i:])
break
}
if tfw.loopsCount > uint64(metricIDsLen)*loopsCountPerMetricNameMatch {
if tfw.filterLoopsCount > int64(metricIDsLen)*loopsCountPerMetricNameMatch {
// It should be faster performing metricName match on the remaining filters
// instead of scanning big number of entries in the inverted index for these filters.
for i < len(tfwsRemaining) {
tfw := tfwsRemaining[i]
tf := tfw.tf
tfsPostponed = append(tfsPostponed, tf)
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount)
i++
for _, tfw := range tfws[i:] {
tfsPostponed = append(tfsPostponed, tfw.tf)
}
storeZeroLoopsCounts(tfws[i:])
break
}
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, 0)
if loopsCount > tfw.loopsCount {
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
}
maxLoopsCount := getFirstPositiveFilterLoopsCount(tfws[i+1:])
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, intMax, maxLoopsCount)
if err != nil {
if errors.Is(err, errTooManyLoops) {
// Postpone tf, since it took more loops than the next filter may need.
storeFilterLoopsCount(&tfw, filterLoopsCount+1)
tfsPostponed = append(tfsPostponed, tf)
continue
}
// Move failing tf to the end of filter list
storeFilterLoopsCount(&tfw, int64Max)
storeZeroLoopsCounts(tfws[i:])
return nil, err
}
storeFilterLoopsCount(&tfw, filterLoopsCount)
if tf.isNegative {
metricIDs.Subtract(m)
} else {
@ -2952,6 +3026,11 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
return metricIDs, nil
}
const (
intMax = int((^uint(0)) >> 1)
int64Max = int64((1 << 63) - 1)
)
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
ii := getIndexItems()
defer putIndexItems(ii)
@ -3098,7 +3177,8 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
return true, nil
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) {
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte,
maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
@ -3110,38 +3190,31 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
tfNew.prefix = kb.B
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics, maxLoopsCount)
kbPool.Put(kb)
if err != nil {
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
loopsCount = 20e9
}
if filter == nil && metricIDs.Len() >= maxMetrics {
// Increase loopsCount for tag filter matching too many metrics,
// So next time it is moved to the end of filter list.
loopsCount *= 2
}
return metricIDs, loopsCount, err
}
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (int64, int64, uint64) {
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = is.db.loopsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
if len(kb.B) != 16 {
return 0, 0
if len(kb.B) != 3*8 {
return 0, 0, 0
}
loopsCount := encoding.UnmarshalUint64(kb.B)
timestamp := encoding.UnmarshalUint64(kb.B[8:])
return loopsCount, timestamp
loopsCount := encoding.UnmarshalInt64(kb.B)
filterLoopsCount := encoding.UnmarshalInt64(kb.B[8:])
timestamp := encoding.UnmarshalUint64(kb.B[16:])
return loopsCount, filterLoopsCount, timestamp
}
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount uint64) {
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, filterLoopsCount int64) {
currentTimestamp := fasttime.UnixTimestamp()
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
kb := kbPool.Get()
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
kb.B = encoding.MarshalInt64(kb.B[:0], loopsCount)
kb.B = encoding.MarshalInt64(kb.B, filterLoopsCount)
kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
is.db.loopsPerDateTagFilterCache.Set(is.kb.B, kb.B)
kbPool.Put(kb)
@ -3226,7 +3299,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, int64Max)
if err != nil {
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
@ -3241,7 +3314,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
} else {
metricIDs.Add(metricID)
}
})
}, int64Max)
if err != nil {
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}

View file

@ -260,10 +260,6 @@ type tagFilter struct {
graphiteReverseSuffix []byte
}
func (tf *tagFilter) looksLikeHeavy() bool {
return tf.isRegexp && len(tf.orSuffixes) == 0
}
func (tf *tagFilter) isComposite() bool {
k := tf.key
return len(k) > 0 && k[0] == compositeTagKeyPrefix