diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index e290db6a0..c6cd7e995 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -126,10 +126,14 @@ type indexDB struct { // Cache for fast MetricID -> MetricName lookup. metricNameCache *workingsetcache.Cache - // Cache holding useless TagFilters entries, which have no tag filters + // Cache for useless TagFilters entries, which have no tag filters // matching low number of metrics. uselessTagFiltersCache *workingsetcache.Cache + // Cache for (date, tagFilter) -> metricIDsLen, which is used for reducing + // the amount of work when matching a set of filters. + metricIDsPerDateTagFilterCache *workingsetcache.Cache + indexSearchPool sync.Pool // An inmemory set of deleted metricIDs. @@ -178,10 +182,11 @@ func openIndexDB(path string, metricIDCache, metricNameCache *workingsetcache.Ca tb: tb, name: name, - tagCache: workingsetcache.New(mem/32, time.Hour), - metricIDCache: metricIDCache, - metricNameCache: metricNameCache, - uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), + tagCache: workingsetcache.New(mem/32, time.Hour), + metricIDCache: metricIDCache, + metricNameCache: metricNameCache, + uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), + metricIDsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), currHourMetricIDs: currHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs, @@ -348,11 +353,13 @@ func (db *indexDB) decRef() { // Free space occupied by caches owned by db. db.tagCache.Stop() db.uselessTagFiltersCache.Stop() + db.metricIDsPerDateTagFilterCache.Stop() db.tagCache = nil db.metricIDCache = nil db.metricNameCache = nil db.uselessTagFiltersCache = nil + db.metricIDsPerDateTagFilterCache = nil if atomic.LoadUint64(&db.mustDrop) == 0 { return @@ -1053,8 +1060,7 @@ func (is *indexSearch) getStartDateForPerDayInvertedIndex() (uint64, error) { if err := ts.Error(); err != nil { return 0, err } - // The minDate can contain incomplete inverted index, so increment it. - minDate++ + // There are no (date,tag)->metricIDs entries in the database yet. return minDate, nil } @@ -1700,22 +1706,8 @@ func (is *indexSearch) searchMetricIDs(tfss []*TagFilters, tr TimeRange, maxMetr func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { // Sort tag filters for faster ts.Seek below. sort.Slice(tfs.tfs, func(i, j int) bool { - // Move regexp and negative filters to the end, since they require scanning - // all the entries for the given label. - a := &tfs.tfs[i] - b := &tfs.tfs[j] - if a.isRegexp != b.isRegexp { - return !a.isRegexp - } - if a.isNegative != b.isNegative { - return !a.isNegative - } - if len(a.orSuffixes) != len(b.orSuffixes) { - return len(a.orSuffixes) < len(b.orSuffixes) - } - return bytes.Compare(a.prefix, b.prefix) < 0 + return tfs.tfs[i].Less(&tfs.tfs[j]) }) - err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) if err == nil { // Fast path: found metricIDs by date range. @@ -2177,10 +2169,40 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set } func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) { - // Populate metricIDs with the first non-negative filter. - var tfFirst *tagFilter + // Sort tfs by the number of matching filters from previous queries. + // This way we limit the amount of work below by applying more specific filters at first. + type tagFilterWithCount struct { + tf *tagFilter + count uint64 + } + tfsWithCount := make([]tagFilterWithCount, len(tfs.tfs)) + kb := &is.kb + var buf []byte for i := range tfs.tfs { tf := &tfs.tfs[i] + kb.B = appendDateTagFilterCacheKey(kb.B[:0], date, tf, tfs.accountID, tfs.projectID) + buf = is.db.metricIDsPerDateTagFilterCache.Get(buf[:0], kb.B) + count := uint64(0) + if len(buf) == 8 { + count = encoding.UnmarshalUint64(buf) + } + tfsWithCount[i] = tagFilterWithCount{ + tf: tf, + count: count, + } + } + sort.Slice(tfsWithCount, func(i, j int) bool { + a, b := &tfsWithCount[i], &tfsWithCount[j] + if a.count != b.count { + return a.count < b.count + } + return a.tf.Less(b.tf) + }) + + // Populate metricIDs with the first non-negative filter. + var tfFirst *tagFilter + for i := range tfsWithCount { + tf := tfsWithCount[i].tf if tf.isNegative { continue } @@ -2217,11 +2239,30 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter } // Intersect metricIDs with the rest of filters. - for i := range tfs.tfs { - tf := &tfs.tfs[i] + for i := range tfsWithCount { + tfWithCount := &tfsWithCount[i] + tf := tfWithCount.tf if tf == tfFirst { continue } + if n := uint64(metricIDs.Len()); n < 1000 || n < tfWithCount.count/maxIndexScanLoopsPerMetric { + // It should be faster performing metricName match on the remaining filters + // instead of scanning big number of entries in the inverted index for these filters. + tfsRemaining := tfsWithCount[i:] + tfsPostponed := make([]*tagFilter, 0, len(tfsRemaining)) + for j := range tfsRemaining { + tf := tfsRemaining[j].tf + if tf == tfFirst { + continue + } + tfsPostponed = append(tfsPostponed, tf) + } + var m uint64set.Set + if err := is.updateMetricIDsByMetricNameMatch(&m, metricIDs, tfsPostponed, tfs.accountID, tfs.projectID); err != nil { + return nil, err + } + return &m, nil + } m, err := is.getMetricIDsForDateTagFilter(tf, date, tfs.commonPrefix, tfs.accountID, tfs.projectID, maxDateMetrics) if err != nil { return nil, err @@ -2376,7 +2417,26 @@ func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, tfNew := *tf tfNew.isNegative = false // isNegative for the original tf is handled by the caller. tfNew.prefix = kb.B - return is.getMetricIDsForTagFilter(&tfNew, maxMetrics) + metricIDs, err := is.getMetricIDsForTagFilter(&tfNew, maxMetrics) + + // Store the number of matching metricIDs in the cache in order to sort tag filters + // in ascending number of matching metricIDs on the next search. + is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0], date, tf, accountID, projectID) + metricIDsLen := uint64(metricIDs.Len()) + if err != nil { + // Set metricIDsLen to maxMetrics, so the given entry will be moved to the end + // of tag filters on the next search. + metricIDsLen = uint64(maxMetrics) + } + kb.B = encoding.MarshalUint64(kb.B[:0], metricIDsLen) + is.db.metricIDsPerDateTagFilterCache.Set(is.kb.B, kb.B) + return metricIDs, err +} + +func appendDateTagFilterCacheKey(dst []byte, date uint64, tf *tagFilter, accountID, projectID uint32) []byte { + dst = encoding.MarshalUint64(dst, date) + dst = tf.Marshal(dst, accountID, projectID) + return dst } func (is *indexSearch) getMetricIDsForDate(date uint64, accountID, projectID uint32, maxMetrics int) (*uint64set.Set, error) { @@ -2444,7 +2504,7 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64 // over the found metrics. const maxIndexScanLoopsPerMetric = 100 -// The maximum number of slow index scan loops per. +// The maximum number of slow index scan loops. // Bigger number of loops is slower than updateMetricIDsByMetricNameMatch // over the found metrics. const maxIndexScanSlowLoopsPerMetric = 20 diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 3c6528ee5..88bb7953e 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -160,6 +160,21 @@ type tagFilter struct { matchesEmptyValue bool } +func (tf *tagFilter) Less(other *tagFilter) bool { + // Move regexp and negative filters to the end, since they require scanning + // all the entries for the given label. + if tf.isRegexp != other.isRegexp { + return !tf.isRegexp + } + if tf.isNegative != other.isNegative { + return !tf.isNegative + } + if len(tf.orSuffixes) != len(other.orSuffixes) { + return len(tf.orSuffixes) < len(other.orSuffixes) + } + return bytes.Compare(tf.prefix, other.prefix) < 0 +} + // String returns human-readable tf value. func (tf *tagFilter) String() string { op := "="