mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-19 15:30:17 +00:00
lib/storage: sort tag filters by the number of loops they need for the execution
This metric should work better than the filter execution duration, since it cannot be distorted by concurrently running queries.
This commit is contained in:
parent
a5bfd831fb
commit
fd41f070db
3 changed files with 85 additions and 89 deletions
|
@ -6,7 +6,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -2072,7 +2071,7 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMet
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
metricIDs, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
|
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, nil, maxMetrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == errFallbackToMetricNameMatch {
|
if err == errFallbackToMetricNameMatch {
|
||||||
// Skip tag filters requiring to scan for too many metrics.
|
// Skip tag filters requiring to scan for too many metrics.
|
||||||
|
@ -2394,34 +2393,36 @@ const (
|
||||||
|
|
||||||
var uselessTagFilterCacheValue = []byte("1")
|
var uselessTagFilterCacheValue = []byte("1")
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, filter *uint64set.Set, maxMetrics int) (*uint64set.Set, uint64, error) {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
logger.Panicf("BUG: isNegative must be false")
|
logger.Panicf("BUG: isNegative must be false")
|
||||||
}
|
}
|
||||||
metricIDs := &uint64set.Set{}
|
metricIDs := &uint64set.Set{}
|
||||||
if len(tf.orSuffixes) > 0 {
|
if len(tf.orSuffixes) > 0 {
|
||||||
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
|
||||||
if err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs); err != nil {
|
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, maxMetrics, metricIDs)
|
||||||
|
if err != nil {
|
||||||
if err == errFallbackToMetricNameMatch {
|
if err == errFallbackToMetricNameMatch {
|
||||||
return nil, err
|
return nil, loopsCount, err
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
|
||||||
}
|
}
|
||||||
return metricIDs, nil
|
return metricIDs, loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - scan for all the rows with the given prefix.
|
// Slow path - scan for all the rows with the given prefix.
|
||||||
maxLoops := maxMetrics * maxIndexScanSlowLoopsPerMetric
|
maxLoopsCount := uint64(maxMetrics) * maxIndexScanSlowLoopsPerMetric
|
||||||
if err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, metricIDs.Add); err != nil {
|
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, metricIDs.Add)
|
||||||
|
if err != nil {
|
||||||
if err == errFallbackToMetricNameMatch {
|
if err == errFallbackToMetricNameMatch {
|
||||||
return nil, err
|
return nil, loopsCount, err
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
|
||||||
}
|
}
|
||||||
return metricIDs, nil
|
return metricIDs, loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoops int, f func(metricID uint64)) error {
|
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, maxLoopsCount uint64, f func(metricID uint64)) (uint64, error) {
|
||||||
if len(tf.orSuffixes) > 0 {
|
if len(tf.orSuffixes) > 0 {
|
||||||
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
|
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
|
||||||
}
|
}
|
||||||
|
@ -2433,40 +2434,40 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
var prevMatchingSuffix []byte
|
var prevMatchingSuffix []byte
|
||||||
var prevMatch bool
|
var prevMatch bool
|
||||||
loops := 0
|
var loopsCount uint64
|
||||||
loopsPaceLimiter := 0
|
loopsPaceLimiter := 0
|
||||||
prefix := tf.prefix
|
prefix := tf.prefix
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for ts.NextItem() {
|
for ts.NextItem() {
|
||||||
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
|
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
|
||||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
return err
|
return loopsCount, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
tail := item[len(prefix):]
|
tail := item[len(prefix):]
|
||||||
n := bytes.IndexByte(tail, tagSeparatorChar)
|
n := bytes.IndexByte(tail, tagSeparatorChar)
|
||||||
if n < 0 {
|
if n < 0 {
|
||||||
return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
|
return loopsCount, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
|
||||||
}
|
}
|
||||||
suffix := tail[:n+1]
|
suffix := tail[:n+1]
|
||||||
tail = tail[n+1:]
|
tail = tail[n+1:]
|
||||||
if err := mp.InitOnlyTail(item, tail); err != nil {
|
if err := mp.InitOnlyTail(item, tail); err != nil {
|
||||||
return err
|
return loopsCount, err
|
||||||
}
|
}
|
||||||
mp.ParseMetricIDs()
|
mp.ParseMetricIDs()
|
||||||
|
loopsCount += uint64(mp.MetricIDsLen())
|
||||||
|
if loopsCount > maxLoopsCount {
|
||||||
|
return loopsCount, errFallbackToMetricNameMatch
|
||||||
|
}
|
||||||
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
|
if prevMatch && string(suffix) == string(prevMatchingSuffix) {
|
||||||
// Fast path: the same tag value found.
|
// Fast path: the same tag value found.
|
||||||
// There is no need in checking it again with potentially
|
// There is no need in checking it again with potentially
|
||||||
// slow tf.matchSuffix, which may call regexp.
|
// slow tf.matchSuffix, which may call regexp.
|
||||||
loops += mp.MetricIDsLen()
|
|
||||||
if loops > maxLoops {
|
|
||||||
return errFallbackToMetricNameMatch
|
|
||||||
}
|
|
||||||
for _, metricID := range mp.MetricIDs {
|
for _, metricID := range mp.MetricIDs {
|
||||||
if filter != nil && !filter.Has(metricID) {
|
if filter != nil && !filter.Has(metricID) {
|
||||||
continue
|
continue
|
||||||
|
@ -2480,11 +2481,11 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
||||||
// since the current row has no matching metricIDs.
|
// since the current row has no matching metricIDs.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: need tf.matchSuffix call.
|
// Slow path: need tf.matchSuffix call.
|
||||||
ok, err := tf.matchSuffix(suffix)
|
ok, err := tf.matchSuffix(suffix)
|
||||||
|
loopsCount += reMatchCost
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
|
return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
|
||||||
}
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
prevMatch = false
|
prevMatch = false
|
||||||
|
@ -2499,18 +2500,16 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
||||||
// The last char in kb.B must be tagSeparatorChar. Just increment it
|
// The last char in kb.B must be tagSeparatorChar. Just increment it
|
||||||
// in order to jump to the next tag value.
|
// in order to jump to the next tag value.
|
||||||
if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
|
if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
|
||||||
return fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
|
return loopsCount, fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
|
||||||
}
|
}
|
||||||
kb.B[len(kb.B)-1]++
|
kb.B[len(kb.B)-1]++
|
||||||
ts.Seek(kb.B)
|
ts.Seek(kb.B)
|
||||||
|
// Assume that a seek cost is equivalent to 100 ordinary loops.
|
||||||
|
loopsCount += 100
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
prevMatch = true
|
prevMatch = true
|
||||||
prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
|
prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
|
||||||
loops += mp.MetricIDsLen()
|
|
||||||
if loops > maxLoops {
|
|
||||||
return errFallbackToMetricNameMatch
|
|
||||||
}
|
|
||||||
for _, metricID := range mp.MetricIDs {
|
for _, metricID := range mp.MetricIDs {
|
||||||
if filter != nil && !filter.Has(metricID) {
|
if filter != nil && !filter.Has(metricID) {
|
||||||
continue
|
continue
|
||||||
|
@ -2519,29 +2518,32 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := ts.Error(); err != nil {
|
if err := ts.Error(); err != nil {
|
||||||
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
|
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
|
||||||
}
|
}
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) error {
|
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
logger.Panicf("BUG: isNegative must be false")
|
logger.Panicf("BUG: isNegative must be false")
|
||||||
}
|
}
|
||||||
kb := kbPool.Get()
|
kb := kbPool.Get()
|
||||||
defer kbPool.Put(kb)
|
defer kbPool.Put(kb)
|
||||||
|
var loopsCount uint64
|
||||||
for _, orSuffix := range tf.orSuffixes {
|
for _, orSuffix := range tf.orSuffixes {
|
||||||
kb.B = append(kb.B[:0], tf.prefix...)
|
kb.B = append(kb.B[:0], tf.prefix...)
|
||||||
kb.B = append(kb.B, orSuffix...)
|
kb.B = append(kb.B, orSuffix...)
|
||||||
kb.B = append(kb.B, tagSeparatorChar)
|
kb.B = append(kb.B, tagSeparatorChar)
|
||||||
if err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs); err != nil {
|
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, maxMetrics, metricIDs)
|
||||||
return err
|
if err != nil {
|
||||||
|
return loopsCount, err
|
||||||
}
|
}
|
||||||
|
loopsCount += lc
|
||||||
if metricIDs.Len() >= maxMetrics {
|
if metricIDs.Len() >= maxMetrics {
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
|
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set) error {
|
||||||
|
@ -2559,39 +2561,39 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, met
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) error {
|
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, maxMetrics int, metricIDs *uint64set.Set) (uint64, error) {
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
maxLoops := maxMetrics * maxIndexScanLoopsPerMetric
|
maxLoopsCount := uint64(maxMetrics) * maxIndexScanLoopsPerMetric
|
||||||
loops := 0
|
var loopsCount uint64
|
||||||
loopsPaceLimiter := 0
|
loopsPaceLimiter := 0
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
for metricIDs.Len() < maxMetrics && ts.NextItem() {
|
||||||
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
|
||||||
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
|
||||||
return err
|
return loopsCount, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
loopsPaceLimiter++
|
loopsPaceLimiter++
|
||||||
item := ts.Item
|
item := ts.Item
|
||||||
if !bytes.HasPrefix(item, prefix) {
|
if !bytes.HasPrefix(item, prefix) {
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
|
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
|
||||||
return err
|
return loopsCount, err
|
||||||
}
|
}
|
||||||
loops += mp.MetricIDsLen()
|
loopsCount += uint64(mp.MetricIDsLen())
|
||||||
if loops > maxLoops {
|
if loopsCount > maxLoopsCount {
|
||||||
return errFallbackToMetricNameMatch
|
return loopsCount, errFallbackToMetricNameMatch
|
||||||
}
|
}
|
||||||
mp.ParseMetricIDs()
|
mp.ParseMetricIDs()
|
||||||
metricIDs.AddMulti(mp.MetricIDs)
|
metricIDs.AddMulti(mp.MetricIDs)
|
||||||
}
|
}
|
||||||
if err := ts.Error(); err != nil {
|
if err := ts.Error(); err != nil {
|
||||||
return fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
|
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
|
||||||
}
|
}
|
||||||
return nil
|
return loopsCount, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
|
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool) error {
|
||||||
|
@ -2603,8 +2605,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
||||||
ts := &is.ts
|
ts := &is.ts
|
||||||
mp := &is.mp
|
mp := &is.mp
|
||||||
mp.Reset()
|
mp.Reset()
|
||||||
maxLoops := len(sortedFilter) * maxIndexScanLoopsPerMetric
|
maxLoopsCount := uint64(len(sortedFilter)) * maxIndexScanLoopsPerMetric
|
||||||
loops := 0
|
var loopsCount uint64
|
||||||
loopsPaceLimiter := 0
|
loopsPaceLimiter := 0
|
||||||
ts.Seek(prefix)
|
ts.Seek(prefix)
|
||||||
var sf []uint64
|
var sf []uint64
|
||||||
|
@ -2635,8 +2637,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metri
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
sf = sortedFilter
|
sf = sortedFilter
|
||||||
loops += mp.MetricIDsLen()
|
loopsCount += uint64(mp.MetricIDsLen())
|
||||||
if loops > maxLoops {
|
if loopsCount > maxLoopsCount {
|
||||||
return errFallbackToMetricNameMatch
|
return errFallbackToMetricNameMatch
|
||||||
}
|
}
|
||||||
mp.ParseMetricIDs()
|
mp.ParseMetricIDs()
|
||||||
|
@ -2806,31 +2808,33 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
|
||||||
// Sort tfs by the duration from previous queries.
|
// Sort tfs by loopsCount needed for performing each filter.
|
||||||
|
// This stats is usually collected from the previous queries.
|
||||||
// This way we limit the amount of work below by applying fast filters at first.
|
// This way we limit the amount of work below by applying fast filters at first.
|
||||||
type tagFilterWithWeight struct {
|
type tagFilterWithWeight struct {
|
||||||
tf *tagFilter
|
tf *tagFilter
|
||||||
durationSeconds float64
|
loopsCount uint64
|
||||||
lastQueryTimestamp uint64
|
lastQueryTimestamp uint64
|
||||||
}
|
}
|
||||||
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
|
tfws := make([]tagFilterWithWeight, len(tfs.tfs))
|
||||||
|
currentTime := fasttime.UnixTimestamp()
|
||||||
for i := range tfs.tfs {
|
for i := range tfs.tfs {
|
||||||
tf := &tfs.tfs[i]
|
tf := &tfs.tfs[i]
|
||||||
durationSeconds, lastQueryTimestamp := is.getDurationAndTimestampForDateFilter(date, tf)
|
loopsCount, lastQueryTimestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
|
||||||
if durationSeconds == 0 {
|
if currentTime > lastQueryTimestamp+60*60 {
|
||||||
// Assume that unknown tag filters can take quite big amounts of time.
|
// Reset loopsCount to 0 every hour for collecting updated stats for the tf.
|
||||||
durationSeconds = 1.0
|
loopsCount = 0
|
||||||
}
|
}
|
||||||
tfws[i] = tagFilterWithWeight{
|
tfws[i] = tagFilterWithWeight{
|
||||||
tf: tf,
|
tf: tf,
|
||||||
durationSeconds: durationSeconds,
|
loopsCount: loopsCount,
|
||||||
lastQueryTimestamp: lastQueryTimestamp,
|
lastQueryTimestamp: lastQueryTimestamp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Slice(tfws, func(i, j int) bool {
|
sort.Slice(tfws, func(i, j int) bool {
|
||||||
a, b := &tfws[i], &tfws[j]
|
a, b := &tfws[i], &tfws[j]
|
||||||
if a.durationSeconds != b.durationSeconds {
|
if a.loopsCount != b.loopsCount {
|
||||||
return a.durationSeconds < b.durationSeconds
|
return a.loopsCount < b.loopsCount
|
||||||
}
|
}
|
||||||
return a.tf.Less(b.tf)
|
return a.tf.Less(b.tf)
|
||||||
})
|
})
|
||||||
|
@ -2897,7 +2901,7 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
// Short circuit - there is no need in applying the remaining filters to an empty set.
|
// Short circuit - there is no need in applying the remaining filters to an empty set.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if float64(metricIDsLen)/metricNameMatchesPerSecond < tfw.durationSeconds {
|
if uint64(metricIDsLen)*maxIndexScanLoopsPerMetric < tfw.loopsCount {
|
||||||
// It should be faster performing metricName match on the remaining filters
|
// It should be faster performing metricName match on the remaining filters
|
||||||
// instead of scanning big number of entries in the inverted index for these filters.
|
// instead of scanning big number of entries in the inverted index for these filters.
|
||||||
tfsPostponed = append(tfsPostponed, tf)
|
tfsPostponed = append(tfsPostponed, tf)
|
||||||
|
@ -2933,12 +2937,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
|
||||||
return metricIDs, nil
|
return metricIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// The estimated number of per-second loops inside updateMetricIDsByMetricNameMatch
|
|
||||||
//
|
|
||||||
// This value is used for determining when matching by metric name must be perfromed instead of matching
|
|
||||||
// by the remaining tag filters.
|
|
||||||
const metricNameMatchesPerSecond = 50000
|
|
||||||
|
|
||||||
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
func (is *indexSearch) storeDateMetricID(date, metricID uint64) error {
|
||||||
ii := getIndexItems()
|
ii := getIndexItems()
|
||||||
defer putIndexItems(ii)
|
defer putIndexItems(ii)
|
||||||
|
@ -3098,8 +3096,7 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
|
||||||
tfNew := *tf
|
tfNew := *tf
|
||||||
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
|
||||||
tfNew.prefix = kb.B
|
tfNew.prefix = kb.B
|
||||||
startTime := time.Now()
|
metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
|
||||||
metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, filter, maxMetrics)
|
|
||||||
kbPool.Put(kb)
|
kbPool.Put(kb)
|
||||||
currentTimestamp := fasttime.UnixTimestamp()
|
currentTimestamp := fasttime.UnixTimestamp()
|
||||||
if currentTimestamp > lastQueryTimestamp+5 {
|
if currentTimestamp > lastQueryTimestamp+5 {
|
||||||
|
@ -3107,23 +3104,22 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, lastQueryTime
|
||||||
// Do not update it too frequently.
|
// Do not update it too frequently.
|
||||||
return metricIDs, err
|
return metricIDs, err
|
||||||
}
|
}
|
||||||
// Store the duration for tag filter execution in the cache in order to sort tag filters
|
// Store the loopsCount for tag filter in the cache in order to sort tag filters
|
||||||
// in ascending durations on the next search.
|
// in ascending durations on the next search.
|
||||||
durationSeconds := time.Since(startTime).Seconds()
|
if err != nil {
|
||||||
if err != nil && durationSeconds < 10 {
|
// Set high loopsCount for failing filter, so it is moved to the end of filter list.
|
||||||
// Set high duration for failing filter, so it is moved to the end of filter list.
|
loopsCount = 1 << 30
|
||||||
durationSeconds = 10
|
|
||||||
}
|
}
|
||||||
if metricIDs.Len() >= maxMetrics {
|
if metricIDs.Len() >= maxMetrics {
|
||||||
// Increase the duration for tag filter matching too many metrics,
|
// Increase loopsCount for tag filter matching too many metrics,
|
||||||
// So next time it will be applied after filters matching lower number of metrics.
|
// So next time it is moved to the end of filter list.
|
||||||
durationSeconds *= 2
|
loopsCount *= 2
|
||||||
}
|
}
|
||||||
is.storeDurationAndTimestampForDateFilter(date, tf, durationSeconds, currentTimestamp)
|
is.storeLoopsCountAndTimestampForDateFilter(date, tf, loopsCount, currentTimestamp)
|
||||||
return metricIDs, err
|
return metricIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tagFilter) (float64, uint64) {
|
func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (uint64, uint64) {
|
||||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
kb := kbPool.Get()
|
kb := kbPool.Get()
|
||||||
defer kbPool.Put(kb)
|
defer kbPool.Put(kb)
|
||||||
|
@ -3131,17 +3127,15 @@ func (is *indexSearch) getDurationAndTimestampForDateFilter(date uint64, tf *tag
|
||||||
if len(kb.B) != 16 {
|
if len(kb.B) != 16 {
|
||||||
return 0, 0
|
return 0, 0
|
||||||
}
|
}
|
||||||
n := encoding.UnmarshalUint64(kb.B)
|
loopsCount := encoding.UnmarshalUint64(kb.B)
|
||||||
durationSeconds := math.Float64frombits(n)
|
|
||||||
timestamp := encoding.UnmarshalUint64(kb.B[8:])
|
timestamp := encoding.UnmarshalUint64(kb.B[8:])
|
||||||
return durationSeconds, timestamp
|
return loopsCount, timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (is *indexSearch) storeDurationAndTimestampForDateFilter(date uint64, tf *tagFilter, durationSeconds float64, timestamp uint64) {
|
func (is *indexSearch) storeLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter, loopsCount, timestamp uint64) {
|
||||||
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, is.accountID, is.projectID)
|
||||||
n := math.Float64bits(durationSeconds)
|
|
||||||
kb := kbPool.Get()
|
kb := kbPool.Get()
|
||||||
kb.B = encoding.MarshalUint64(kb.B[:0], n)
|
kb.B = encoding.MarshalUint64(kb.B[:0], loopsCount)
|
||||||
kb.B = encoding.MarshalUint64(kb.B, timestamp)
|
kb.B = encoding.MarshalUint64(kb.B, timestamp)
|
||||||
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
is.db.durationsPerDateTagFilterCache.Set(is.kb.B, kb.B)
|
||||||
kbPool.Put(kb)
|
kbPool.Put(kb)
|
||||||
|
@ -3268,8 +3262,8 @@ func (is *indexSearch) intersectMetricIDsWithTagFilterNocache(tf *tagFilter, fil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path - scan for all the rows with the given prefix.
|
// Slow path - scan for all the rows with the given prefix.
|
||||||
maxLoops := filter.Len() * maxIndexScanSlowLoopsPerMetric
|
maxLoopsCount := uint64(filter.Len()) * maxIndexScanSlowLoopsPerMetric
|
||||||
err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoops, func(metricID uint64) {
|
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, maxLoopsCount, func(metricID uint64) {
|
||||||
if tf.isNegative {
|
if tf.isNegative {
|
||||||
// filter must be equal to metricIDs
|
// filter must be equal to metricIDs
|
||||||
metricIDs.Del(metricID)
|
metricIDs.Del(metricID)
|
||||||
|
|
|
@ -203,7 +203,7 @@ func (tb *table) MustClose() {
|
||||||
if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
|
if n := atomic.LoadUint64(&ptw.refCount); n != 1 {
|
||||||
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
|
logger.Panicf("BUG: unexpected refCount=%d when closing the partition; probably there are pending searches", n)
|
||||||
}
|
}
|
||||||
ptw.pt.MustClose()
|
ptw.decRef()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release exclusive lock on the table.
|
// Release exclusive lock on the table.
|
||||||
|
|
|
@ -233,7 +233,9 @@ type tagFilter struct {
|
||||||
value []byte
|
value []byte
|
||||||
isNegative bool
|
isNegative bool
|
||||||
isRegexp bool
|
isRegexp bool
|
||||||
matchCost uint64
|
|
||||||
|
// matchCost is a cost for matching a filter against a single string.
|
||||||
|
matchCost uint64
|
||||||
|
|
||||||
// Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}.
|
// Prefix always contains {nsPrefixTagToMetricIDs, AccountID, ProjectID, key}.
|
||||||
// Additionally it contains:
|
// Additionally it contains:
|
||||||
|
|
Loading…
Reference in a new issue