mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/storage: further tuning for time series search
This commit is contained in:
parent
f0a4157f89
commit
e36fbfae5b
2 changed files with 165 additions and 96 deletions
|
@ -2058,7 +2058,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
|
|||
continue
|
||||
}
|
||||
|
||||
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
|
||||
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics, int64Max)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
|
||||
}
|
||||
|
@ -2309,7 +2309,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
|||
// Fast path: found metricIDs by date range.
|
||||
return nil
|
||||
}
|
||||
if err != errFallbackToGlobalSearch {
|
||||
if !errors.Is(err, errFallbackToGlobalSearch) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -2361,19 +2361,19 @@ const (
|
|||
|
||||
var uselessTagFilterCacheValue = []byte("1")
|
||||
|
||||
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, uint64, error) {
|
||||
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
||||
if tf.isNegative {
|
||||
logger.Panicf("BUG: isNegative must be false")
|
||||
}
|
||||
metricIDs := &uint64set.Set{}
|
||||
if len(tf.orSuffixes) > 0 {
|
||||
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
var err error
|
||||
if filter != nil {
|
||||
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
|
||||
loopsCount, err = is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, maxLoopsCount)
|
||||
} else {
|
||||
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
|
||||
loopsCount, err = is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
||||
|
@ -2382,14 +2382,16 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set
|
|||
}
|
||||
|
||||
// Slow path - scan for all the rows with the given prefix.
|
||||
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add)
|
||||
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, metricIDs.Add, maxLoopsCount)
|
||||
if err != nil {
|
||||
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
||||
}
|
||||
return metricIDs, loopsCount, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64)) (uint64, error) {
|
||||
var errTooManyLoops = fmt.Errorf("too many loops is needed for applying this filter")
|
||||
|
||||
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64), maxLoopsCount int64) (int64, error) {
|
||||
if len(tf.orSuffixes) > 0 {
|
||||
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
|
||||
}
|
||||
|
@ -2401,7 +2403,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
|||
mp.Reset()
|
||||
var prevMatchingSuffix []byte
|
||||
var prevMatch bool
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
loopsPaceLimiter := 0
|
||||
prefix := tf.prefix
|
||||
ts.Seek(prefix)
|
||||
|
@ -2427,7 +2429,10 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
|||
return loopsCount, err
|
||||
}
|
||||
mp.ParseMetricIDs()
|
||||
loopsCount += uint64(mp.MetricIDsLen())
|
||||
loopsCount += int64(mp.MetricIDsLen())
|
||||
if loopsCount > maxLoopsCount {
|
||||
return loopsCount, errTooManyLoops
|
||||
}
|
||||
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
|
||||
// Fast path: the same tag value found.
|
||||
// There is no need in checking it again with potentially
|
||||
|
@ -2448,7 +2453,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
|||
// Slow path: need tf.matchSuffix call.
|
||||
ok, err := tf.matchSuffix(suffix)
|
||||
// Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration.
|
||||
loopsCount += 10 * tf.matchCost
|
||||
loopsCount += 10 * int64(tf.matchCost)
|
||||
if err != nil {
|
||||
return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
|
||||
}
|
||||
|
@ -2488,18 +2493,18 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
|||
return loopsCount, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
|
||||
if tf.isNegative {
|
||||
logger.Panicf("BUG: isNegative must be false")
|
||||
}
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
for _, orSuffix := range tf.orSuffixes {
|
||||
kb.B = append(kb.B[:0], tf.prefix...)
|
||||
kb.B = append(kb.B, orSuffix...)
|
||||
kb.B = append(kb.B, tagSeparatorChar)
|
||||
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs)
|
||||
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount)
|
||||
if err != nil {
|
||||
return loopsCount, err
|
||||
}
|
||||
|
@ -2511,16 +2516,16 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMe
|
|||
return loopsCount, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) (uint64, error) {
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set, maxLoopsCount int64) (int64, error) {
|
||||
sortedFilter := filter.AppendTo(nil)
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
for _, orSuffix := range tf.orSuffixes {
|
||||
kb.B = append(kb.B[:0], tf.prefix...)
|
||||
kb.B = append(kb.B, orSuffix...)
|
||||
kb.B = append(kb.B, tagSeparatorChar)
|
||||
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative)
|
||||
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative, maxLoopsCount-loopsCount)
|
||||
if err != nil {
|
||||
return loopsCount, err
|
||||
}
|
||||
|
@ -2529,11 +2534,11 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
|
|||
return loopsCount, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
|
||||
ts := &is.ts
|
||||
mp := &is.mp
|
||||
mp.Reset()
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
loopsPaceLimiter := 0
|
||||
ts.Seek(prefix)
|
||||
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
||||
|
@ -2550,7 +2555,10 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
|
|||
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
|
||||
return loopsCount, err
|
||||
}
|
||||
loopsCount += uint64(mp.MetricIDsLen())
|
||||
loopsCount += int64(mp.MetricIDsLen())
|
||||
if loopsCount > maxLoopsCount {
|
||||
return loopsCount, errTooManyLoops
|
||||
}
|
||||
mp.ParseMetricIDs()
|
||||
metricIDs.AddMulti(mp.MetricIDs)
|
||||
}
|
||||
|
@ -2560,7 +2568,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetr
|
|||
return loopsCount, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) (uint64, error) {
|
||||
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool, maxLoopsCount int64) (int64, error) {
|
||||
if len(sortedFilter) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
@ -2569,7 +2577,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
|||
ts := &is.ts
|
||||
mp := &is.mp
|
||||
mp.Reset()
|
||||
var loopsCount uint64
|
||||
var loopsCount int64
|
||||
loopsPaceLimiter := 0
|
||||
ts.Seek(prefix)
|
||||
var sf []uint64
|
||||
|
@ -2588,7 +2596,10 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
|||
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
|
||||
return loopsCount, err
|
||||
}
|
||||
loopsCount += uint64(mp.MetricIDsLen())
|
||||
loopsCount += int64(mp.MetricIDsLen())
|
||||
if loopsCount > maxLoopsCount {
|
||||
return loopsCount, errTooManyLoops
|
||||
}
|
||||
firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs()
|
||||
if lastMetricID < firstFilterMetricID {
|
||||
// Skip the item, since it contains metricIDs lower
|
||||
|
@ -2749,12 +2760,6 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
|||
return
|
||||
}
|
||||
if err != nil {
|
||||
if err == errFallbackToGlobalSearch {
|
||||
// The per-date search is too expensive. Probably it is faster to perform global search
|
||||
// using metric name match.
|
||||
errGlobal = err
|
||||
return
|
||||
}
|
||||
dateStr := time.Unix(int64(date*24*3600), 0)
|
||||
errGlobal = fmt.Errorf("cannot search for metricIDs at %s: %w", dateStr, err)
|
||||
return
|
||||
|
@ -2778,26 +2783,26 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
// This stats is usually collected from the previous queries.
|
||||
// This way we limit the amount of work below by applying fast filters at first.
|
||||
type tagFilterWithWeight struct {
|
||||
tf *tagFilter
|
||||
loopsCount uint64
|
||||
tf *tagFilter
|
||||
loopsCount int64
|
||||
filterLoopsCount int64
|
||||
}
|
||||
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||
currentTime := fasttime.UnixTimestamp()
|
||||
for i := range tfs.tfs {
|
||||
tf := &tfs.tfs[i]
|
||||
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
||||
loopsCount, filterLoopsCount, timestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
||||
origLoopsCount := loopsCount
|
||||
if loopsCount == 0 && tf.looksLikeHeavy() {
|
||||
// Set high loopsCount for heavy tag filters instead of spending CPU time on their execution.
|
||||
loopsCount = 11e6
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
}
|
||||
if currentTime > lastQueryTimestamp+3600 {
|
||||
origFilterLoopsCount := filterLoopsCount
|
||||
if currentTime > timestamp+3600 {
|
||||
// Update stats once per hour for relatively fast tag filters.
|
||||
// There is no need in spending CPU resources on updating stats for heavy tag filters.
|
||||
if loopsCount <= 10e6 {
|
||||
loopsCount = 0
|
||||
}
|
||||
if filterLoopsCount <= 10e6 {
|
||||
filterLoopsCount = 0
|
||||
}
|
||||
}
|
||||
if loopsCount == 0 {
|
||||
// Prevent from possible thundering herd issue when potentially heavy tf is executed from multiple concurrent queries
|
||||
|
@ -2805,11 +2810,15 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
if origLoopsCount == 0 {
|
||||
origLoopsCount = 9e6
|
||||
}
|
||||
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount)
|
||||
if origFilterLoopsCount == 0 {
|
||||
origFilterLoopsCount = 9e6
|
||||
}
|
||||
is.storeLoopsCountForDateFilter(date, tf, origLoopsCount, origFilterLoopsCount)
|
||||
}
|
||||
tfws[i] = tagFilterWithWeight{
|
||||
tf: tf,
|
||||
loopsCount: loopsCount,
|
||||
tf: tf,
|
||||
loopsCount: loopsCount,
|
||||
filterLoopsCount: filterLoopsCount,
|
||||
}
|
||||
}
|
||||
sort.Slice(tfws, func(i, j int) bool {
|
||||
|
@ -2819,45 +2828,84 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
}
|
||||
return a.tf.Less(b.tf)
|
||||
})
|
||||
getFirstPositiveLoopsCount := func(tfws []tagFilterWithWeight) int64 {
|
||||
for i := range tfws {
|
||||
if n := tfws[i].loopsCount; n > 0 {
|
||||
return n
|
||||
}
|
||||
}
|
||||
return int64Max
|
||||
}
|
||||
storeLoopsCount := func(tfw *tagFilterWithWeight, loopsCount int64) {
|
||||
needStore := false
|
||||
if loopsCount != tfw.loopsCount {
|
||||
tfw.loopsCount = loopsCount
|
||||
needStore = true
|
||||
}
|
||||
if loopsCount > tfw.filterLoopsCount {
|
||||
tfw.filterLoopsCount = loopsCount
|
||||
needStore = true
|
||||
}
|
||||
if needStore {
|
||||
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, tfw.filterLoopsCount)
|
||||
}
|
||||
}
|
||||
storeZeroLoopsCounts := func(tfws []tagFilterWithWeight) {
|
||||
for _, tfw := range tfws {
|
||||
if tfw.loopsCount == 0 || tfw.filterLoopsCount == 0 {
|
||||
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, tfw.filterLoopsCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Populate metricIDs for the first non-negative filter.
|
||||
// Populate metricIDs for the first non-negative filter with the cost smaller than maxLoopsCount.
|
||||
var metricIDs *uint64set.Set
|
||||
tfwsRemaining := tfws[:0]
|
||||
maxDateMetrics := maxMetrics * 50
|
||||
for i := range tfws {
|
||||
tfw := tfws[i]
|
||||
maxDateMetrics := intMax
|
||||
if maxMetrics < intMax/50 {
|
||||
maxDateMetrics = maxMetrics * 50
|
||||
}
|
||||
for i, tfw := range tfws {
|
||||
tf := tfw.tf
|
||||
if tf.isNegative {
|
||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||
continue
|
||||
}
|
||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics)
|
||||
if loopsCount > tfw.loopsCount {
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
}
|
||||
maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
|
||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, nil, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
|
||||
if err != nil {
|
||||
if errors.Is(err, errTooManyLoops) {
|
||||
// The tf took too many loops compared to the next filter. Postpone applying this filter.
|
||||
storeLoopsCount(&tfw, loopsCount+1)
|
||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||
continue
|
||||
}
|
||||
// Move failing filter to the end of filter list.
|
||||
storeLoopsCount(&tfw, int64Max)
|
||||
storeZeroLoopsCounts(tfws[i+1:])
|
||||
storeZeroLoopsCounts(tfwsRemaining)
|
||||
return nil, err
|
||||
}
|
||||
if m.Len() >= maxDateMetrics {
|
||||
// Too many time series found by a single tag filter. Postpone applying this filter.
|
||||
// Too many time series found by a single tag filter. Move the filter to the end of list.
|
||||
storeLoopsCount(&tfw, int64Max-1)
|
||||
tfwsRemaining = append(tfwsRemaining, tfw)
|
||||
tfw.loopsCount = loopsCount
|
||||
continue
|
||||
}
|
||||
storeLoopsCount(&tfw, loopsCount)
|
||||
metricIDs = m
|
||||
i++
|
||||
for i < len(tfws) {
|
||||
tfwsRemaining = append(tfwsRemaining, tfws[i])
|
||||
i++
|
||||
}
|
||||
tfwsRemaining = append(tfwsRemaining, tfws[i+1:]...)
|
||||
break
|
||||
}
|
||||
tfws = tfwsRemaining
|
||||
|
||||
if metricIDs == nil {
|
||||
// All the filters in tfs are negative or match too many time series.
|
||||
// Populate all the metricIDs for the given (date),
|
||||
// so later they can be filtered out with negative filters.
|
||||
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
|
||||
if err != nil {
|
||||
storeZeroLoopsCounts(tfws)
|
||||
if err == errMissingMetricIDsForDate {
|
||||
// Zero time series were written on the given date.
|
||||
return nil, nil
|
||||
|
@ -2866,11 +2914,33 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
}
|
||||
if m.Len() >= maxDateMetrics {
|
||||
// Too many time series found for the given (date). Fall back to global search.
|
||||
storeZeroLoopsCounts(tfws)
|
||||
return nil, errFallbackToGlobalSearch
|
||||
}
|
||||
metricIDs = m
|
||||
}
|
||||
|
||||
sort.Slice(tfws, func(i, j int) bool {
|
||||
a, b := &tfws[i], &tfws[j]
|
||||
if a.filterLoopsCount != b.filterLoopsCount {
|
||||
return a.filterLoopsCount < b.filterLoopsCount
|
||||
}
|
||||
return a.tf.Less(b.tf)
|
||||
})
|
||||
getFirstPositiveFilterLoopsCount := func(tfws []tagFilterWithWeight) int64 {
|
||||
for i := range tfws {
|
||||
if n := tfws[i].filterLoopsCount; n > 0 {
|
||||
return n
|
||||
}
|
||||
}
|
||||
return int64Max
|
||||
}
|
||||
storeFilterLoopsCount := func(tfw *tagFilterWithWeight, filterLoopsCount int64) {
|
||||
if filterLoopsCount != tfw.filterLoopsCount {
|
||||
is.storeLoopsCountForDateFilter(date, tfw.tf, tfw.loopsCount, filterLoopsCount)
|
||||
}
|
||||
}
|
||||
|
||||
// Intersect metricIDs with the rest of filters.
|
||||
//
|
||||
// Do not run these tag filters in parallel, since this may result in CPU and RAM waste
|
||||
|
@ -2878,34 +2948,38 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
// so the remaining filters could be performed via much faster metricName matching instead
|
||||
// of slow selecting of matching metricIDs.
|
||||
var tfsPostponed []*tagFilter
|
||||
for i := range tfwsRemaining {
|
||||
tfw := tfwsRemaining[i]
|
||||
for i, tfw := range tfws {
|
||||
tf := tfw.tf
|
||||
metricIDsLen := metricIDs.Len()
|
||||
if metricIDsLen == 0 {
|
||||
// Short circuit - there is no need in applying the remaining filters to an empty set.
|
||||
// There is no need in applying the remaining filters to an empty set.
|
||||
storeZeroLoopsCounts(tfws[i:])
|
||||
break
|
||||
}
|
||||
if tfw.loopsCount > uint64(metricIDsLen)*loopsCountPerMetricNameMatch {
|
||||
if tfw.filterLoopsCount > int64(metricIDsLen)*loopsCountPerMetricNameMatch {
|
||||
// It should be faster performing metricName match on the remaining filters
|
||||
// instead of scanning big number of entries in the inverted index for these filters.
|
||||
for i < len(tfwsRemaining) {
|
||||
tfw := tfwsRemaining[i]
|
||||
tf := tfw.tf
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
// Store stats for non-executed tf, since it could be updated during protection from thundered herd.
|
||||
is.storeLoopsCountForDateFilter(date, tf, tfw.loopsCount)
|
||||
i++
|
||||
for _, tfw := range tfws[i:] {
|
||||
tfsPostponed = append(tfsPostponed, tfw.tf)
|
||||
}
|
||||
storeZeroLoopsCounts(tfws[i:])
|
||||
break
|
||||
}
|
||||
m, loopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, 0)
|
||||
if loopsCount > tfw.loopsCount {
|
||||
is.storeLoopsCountForDateFilter(date, tf, loopsCount)
|
||||
}
|
||||
maxLoopsCount := getFirstPositiveFilterLoopsCount(tfws[i+1:])
|
||||
m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(tf, date, metricIDs, tfs.commonPrefix, intMax, maxLoopsCount)
|
||||
if err != nil {
|
||||
if errors.Is(err, errTooManyLoops) {
|
||||
// Postpone tf, since it took more loops than the next filter may need.
|
||||
storeFilterLoopsCount(&tfw, filterLoopsCount+1)
|
||||
tfsPostponed = append(tfsPostponed, tf)
|
||||
continue
|
||||
}
|
||||
// Move failing tf to the end of filter list
|
||||
storeFilterLoopsCount(&tfw, int64Max)
|
||||
storeZeroLoopsCounts(tfws[i:])
|
||||
return nil, err
|
||||
}
|
||||
storeFilterLoopsCount(&tfw, filterLoopsCount)
|
||||
if tf.isNegative {
|
||||
metricIDs.Subtract(m)
|
||||
} else {
|
||||
|
@ -2927,6 +3001,11 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
|||
return metricIDs, nil
|
||||
}
|
||||
|
||||
const (
|
||||
intMax = int((^uint(0)) >> 1)
|
||||
int64Max = int64((1 << 63) - 1)
|
||||
)
|
||||
|
||||
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||
ii := getIndexItems()
|
||||
defer putIndexItems(ii)
|
||||
|
@ -3073,7 +3152,8 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte, maxMetrics int) (*uint64set.Set, uint64, error) {
|
||||
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, filter *uint64set.Set, commonPrefix []byte,
|
||||
maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
|
||||
// Augument tag filter prefix for per-date search instead of global search.
|
||||
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
|
||||
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
|
||||
|
@ -3085,38 +3165,31 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64,
|
|||
tfNew := *tf
|
||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||
tfNew.prefix = kb.B
|
||||
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
|
||||
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics, maxLoopsCount)
|
||||
kbPool.Put(kb)
|
||||
if err != nil {
|
||||
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
|
||||
loopsCount = 20e9
|
||||
}
|
||||
if filter == nil && metricIDs.Len() >= maxMetrics {
|
||||
// Increase loopsCount for tag filter matching too many metrics,
|
||||
// So next time it is moved to the end of filter list.
|
||||
loopsCount *= 2
|
||||
}
|
||||
return metricIDs, loopsCount, err
|
||||
}
|
||||
|
||||
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
|
||||
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (int64, int64, uint64) {
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
kb := kbPool.Get()
|
||||
defer kbPool.Put(kb)
|
||||
kb.B = is.db.loopsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
|
||||
if len(kb.B) != 16 {
|
||||
return 0, 0
|
||||
if len(kb.B) != 3*8 {
|
||||
return 0, 0, 0
|
||||
}
|
||||
loopsCount := encoding.UnmarshalUint64(kb.B)
|
||||
timestamp := encoding.UnmarshalUint64(kb.B[8:])
|
||||
return loopsCount, timestamp
|
||||
loopsCount := encoding.UnmarshalInt64(kb.B)
|
||||
filterLoopsCount := encoding.UnmarshalInt64(kb.B[8:])
|
||||
timestamp := encoding.UnmarshalUint64(kb.B[16:])
|
||||
return loopsCount, filterLoopsCount, timestamp
|
||||
}
|
||||
|
||||
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount uint64) {
|
||||
func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, filterLoopsCount int64) {
|
||||
currentTimestamp := fasttime.UnixTimestamp()
|
||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf)
|
||||
kb := kbPool.Get()
|
||||
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
|
||||
kb.B = encoding.MarshalInt64(kb.B[:0], loopsCount)
|
||||
kb.B = encoding.MarshalInt64(kb.B, filterLoopsCount)
|
||||
kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
|
||||
is.db.loopsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||
kbPool.Put(kb)
|
||||
|
@ -3201,7 +3274,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
|
|||
}
|
||||
if len(tf.orSuffixes) > 0 {
|
||||
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
||||
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter)
|
||||
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, int64Max)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
||||
}
|
||||
|
@ -3216,7 +3289,7 @@ func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *ui
|
|||
} else {
|
||||
metricIDs.Add(metricID)
|
||||
}
|
||||
})
|
||||
}, int64Max)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
||||
}
|
||||
|
|
|
@ -248,10 +248,6 @@ type tagFilter struct {
|
|||
graphiteReverseSuffix []byte
|
||||
}
|
||||
|
||||
func (tf *tagFilter) looksLikeHeavy() bool {
|
||||
return tf.isRegexp && len(tf.orSuffixes) == 0
|
||||
}
|
||||
|
||||
func (tf *tagFilter) isComposite() bool {
|
||||
k := tf.key
|
||||
return len(k) > 0 && k[0] == compositeTagKeyPrefix
|
||||
|
|
Loading…
Reference in a new issue