li/storage: re-use the per-day inverted index search code for searching in global index

This allows removing a big pile of outdated code for global index search.

This may help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1486
This commit is contained in:
Aliaksandr Valialkin 2021-07-30 08:37:10 +03:00
parent 74ffaa45d9
commit d05cac6c98
4 changed files with 38 additions and 511 deletions

View file

@ -448,12 +448,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_missing_tsids_for_metric_id_total`, func() float64 {
return float64(idbm().MissingTSIDsForMetricID)
})
metrics.NewGauge(`vm_date_metric_ids_search_calls_total`, func() float64 {
return float64(idbm().DateMetricIDsSearchCalls)
})
metrics.NewGauge(`vm_date_metric_ids_search_hits_total`, func() float64 {
return float64(idbm().DateMetricIDsSearchHits)
})
metrics.NewGauge(`vm_index_blocks_with_metric_ids_processed_total`, func() float64 {
return float64(idbm().IndexBlocksWithMetricIDsProcessed)
})
@ -609,6 +603,9 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_date_range_hits_total`, func() float64 {
return float64(idbm().DateRangeSearchHits)
})
metrics.NewGauge(`vm_global_search_calls_total`, func() float64 {
return float64(idbm().GlobalSearchCalls)
})
metrics.NewGauge(`vm_missing_metric_names_for_metric_id_total`, func() float64 {
return float64(idbm().MissingMetricNamesForMetricID)
@ -654,9 +651,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize())
})
@ -697,9 +691,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/prefetchedMetricIDs"}`, func() float64 {
return float64(m().PrefetchedMetricIDsSizeBytes)
})
@ -728,9 +719,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheRequests())
})
@ -759,9 +747,6 @@ func registerStorageMetrics() {
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 {
return float64(idbm().UselessTagFiltersCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMisses())
})

View file

@ -63,18 +63,15 @@ type indexDB struct {
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
// The number of searches for metric ids by days.
dateMetricIDsSearchCalls uint64
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
// The number of calls for date range searches.
dateRangeSearchCalls uint64
// The number of hits for date range searches.
dateRangeSearchHits uint64
// The number of calls for global search.
globalSearchCalls uint64
// missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries.
// High rate may mean corrupted indexDB due to unclean shutdown.
// The db must be automatically recovered after that.
@ -94,10 +91,6 @@ type indexDB struct {
// The parent storage.
s *Storage
// Cache for useless TagFilters entries, which have no tag filters
// matching low number of metrics.
uselessTagFiltersCache *workingsetcache.Cache
// Cache for (date, tagFilter) -> loopsCount, which is used for reducing
// the amount of work when matching a set of filters.
loopsPerDateTagFilterCache *workingsetcache.Cache
@ -128,7 +121,6 @@ func openIndexDB(path string, s *Storage) (*indexDB, error) {
tagFiltersCache: workingsetcache.New(mem/32, time.Hour),
s: s,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
}
return db, nil
@ -143,11 +135,6 @@ type IndexDBMetrics struct {
TagFiltersCacheRequests uint64
TagFiltersCacheMisses uint64
UselessTagFiltersCacheSize uint64
UselessTagFiltersCacheSizeBytes uint64
UselessTagFiltersCacheRequests uint64
UselessTagFiltersCacheMisses uint64
DeletedMetricsCount uint64
IndexDBRefCount uint64
@ -157,11 +144,10 @@ type IndexDBMetrics struct {
RecentHourMetricIDsSearchCalls uint64
RecentHourMetricIDsSearchHits uint64
DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64
DateRangeSearchCalls uint64
DateRangeSearchHits uint64
GlobalSearchCalls uint64
MissingMetricNamesForMetricID uint64
@ -190,23 +176,15 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.TagFiltersCacheRequests += cs.GetCalls
m.TagFiltersCacheMisses += cs.Misses
cs.Reset()
db.uselessTagFiltersCache.UpdateStats(&cs)
m.UselessTagFiltersCacheSize += cs.EntriesCount
m.UselessTagFiltersCacheSizeBytes += cs.BytesSize
m.UselessTagFiltersCacheRequests += cs.GetCalls
m.UselessTagFiltersCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
m.GlobalSearchCalls += atomic.LoadUint64(&db.globalSearchCalls)
m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID)
@ -277,12 +255,10 @@ func (db *indexDB) decRef() {
// Free space occupied by caches owned by db.
db.tagFiltersCache.Stop()
db.uselessTagFiltersCache.Stop()
db.loopsPerDateTagFilterCache.Stop()
db.tagFiltersCache = nil
db.s = nil
db.uselessTagFiltersCache = nil
db.loopsPerDateTagFilterCache = nil
if atomic.LoadUint64(&db.mustDrop) == 0 {
@ -1600,9 +1576,6 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
db.s.resetAndSaveTSIDCache()
// Do not reset uselessTagFiltersCache, since the found metricIDs
// on cache miss are filtered out later with deletedMetricIDs.
// Store the metricIDs as deleted.
// Make this after updating the deletedMetricIDs and resetting caches
// in order to exclude the possibility of the inconsistent state when the deleted metricIDs
@ -1966,167 +1939,6 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs
return nil
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountOptimized(tfs *TagFilters, tr TimeRange, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountAdaptive(tfs, maxMetrics)
if err == nil {
return minTf, minMetricIDs, nil
}
if err != errTooManyMetrics {
return nil, nil, err
}
// All the tag filters match too many metrics.
// Slow path: try filtering the matching metrics by time range.
// This should work well for cases when old metrics are constantly substituted
// by big number of new metrics. For example, prometheus-operator creates many new
// metrics for each new deployment.
//
// Allow fetching up to 20*maxMetrics metrics for the given time range
// in the hope these metricIDs will be filtered out by other filters later.
maxTimeRangeMetrics := 20 * maxMetrics
metricIDsForTimeRange, err := is.getMetricIDsForTimeRange(tr, maxTimeRangeMetrics+1)
if err == errMissingMetricIDsForDate {
return nil, nil, fmt.Errorf("cannot find tag filter matching less than %d time series; "+
"either increase -search.maxUniqueTimeseries or use more specific tag filters", maxMetrics)
}
if err != nil {
return nil, nil, err
}
if metricIDsForTimeRange.Len() <= maxTimeRangeMetrics {
return nil, metricIDsForTimeRange, nil
}
return nil, nil, fmt.Errorf("more than %d time series found on the time range %s; either increase -search.maxUniqueTimeseries or shrink the time range",
maxMetrics, tr.String())
}
func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
appendCacheKeyPrefix := func(dst []byte, prefix byte) []byte {
dst = append(dst, prefix)
dst = append(dst, is.db.name...)
dst = encoding.MarshalUint64(dst, uint64(maxMetrics))
return dst
}
kb := &is.kb
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
kb.B = tfs.marshal(kb.B)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work below, since the tfs doesn't contain tag filters matching less than maxMetrics metrics.
return nil, nil, errTooManyMetrics
}
// Iteratively increase maxAllowedMetrics up to maxMetrics in order to limit
// the time required for founding the tag filter with minimum matching metrics.
maxAllowedMetrics := 16
if maxAllowedMetrics > maxMetrics {
maxAllowedMetrics = maxMetrics
}
for {
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCount(tfs, maxAllowedMetrics)
if err != errTooManyMetrics {
if err != nil {
return nil, nil, err
}
if minMetricIDs.Len() < maxAllowedMetrics {
// Found the tag filter with the minimum number of metrics.
return minTf, minMetricIDs, nil
}
}
// Too many metrics matched.
if maxAllowedMetrics >= maxMetrics {
// The tag filter with minimum matching metrics matches at least maxMetrics metrics.
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessMultiTagFiltersKeyPrefix)
kb.B = tfs.marshal(kb.B)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
return nil, nil, errTooManyMetrics
}
// Increase maxAllowedMetrics and try again.
maxAllowedMetrics *= 4
if maxAllowedMetrics > maxMetrics {
maxAllowedMetrics = maxMetrics
}
}
}
var errTooManyMetrics = errors.New("all the tag filters match too many metrics")
func (is *indexSearch) getTagFilterWithMinMetricIDsCount(tfs *TagFilters, maxMetrics int) (*tagFilter, *uint64set.Set, error) {
appendCacheKeyPrefix := func(dst []byte, prefix byte) []byte {
dst = append(dst, prefix)
dst = append(dst, is.db.name...)
dst = encoding.MarshalUint64(dst, uint64(maxMetrics))
return dst
}
var minMetricIDs *uint64set.Set
var minTf *tagFilter
kb := &is.kb
uselessTagFilters := 0
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf.isNegative {
// Skip negative filters.
continue
}
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = tf.Marshal(kb.B)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
// Skip useless work below, since the tf matches at least maxMetrics metrics.
uselessTagFilters++
continue
}
metricIDs, _, err := is.getMetricIDsForTagFilter(tf, maxMetrics, int64Max)
if err != nil {
return nil, nil, fmt.Errorf("cannot find MetricIDs for tagFilter %s: %w", tf, err)
}
if metricIDs.Len() >= maxMetrics {
// The tf matches at least maxMetrics. Skip it
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessSingleTagFilterKeyPrefix)
kb.B = tf.Marshal(kb.B)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
uselessTagFilters++
continue
}
minMetricIDs = metricIDs
minTf = tf
maxMetrics = minMetricIDs.Len()
if maxMetrics <= 1 {
// There is no need in inspecting other filters, since minTf
// already matches 0 or 1 metric.
break
}
}
if minTf != nil {
return minTf, minMetricIDs, nil
}
if uselessTagFilters == len(tfs.tfs) {
// All the tag filters return at least maxMetrics entries.
return nil, nil, errTooManyMetrics
}
// There is no positive filter with small number of matching metrics.
// Create it, so it matches all the MetricIDs.
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessNegativeTagFilterKeyPrefix)
kb.B = tfs.marshal(kb.B)
if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 {
return nil, nil, errTooManyMetrics
}
metricIDs := &uint64set.Set{}
if err := is.updateMetricIDsAll(metricIDs, maxMetrics); err != nil {
return nil, nil, err
}
if metricIDs.Len() >= maxMetrics {
kb.B = appendCacheKeyPrefix(kb.B[:0], uselessNegativeTagFilterKeyPrefix)
kb.B = tfs.marshal(kb.B)
is.db.uselessTagFiltersCache.Set(kb.B, uselessTagFilterCacheValue)
}
return nil, metricIDs, nil
}
func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
if !hasCompositeTagFilters(tfs, prefix) {
return tfs
@ -2334,41 +2146,16 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
return err
}
// Slow path - try searching over the whole inverted index.
// Sort tag filters for faster ts.Seek below.
sort.Slice(tfs.tfs, func(i, j int) bool {
return tfs.tfs[i].Less(&tfs.tfs[j])
})
minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics)
// Slow path - fall back to search in the global inverted index.
atomic.AddUint64(&is.db.globalSearchCalls, 1)
m, err := is.getMetricIDsForDateAndFilters(0, tfs, maxMetrics)
if err != nil {
return err
}
// Find intersection of minTf with other tfs.
for i := range tfs.tfs {
tf := &tfs.tfs[i]
if tf == minTf {
continue
}
mIDs, err := is.intersectMetricIDsWithTagFilter(tf, minMetricIDs)
if err != nil {
return err
}
minMetricIDs = mIDs
}
metricIDs.UnionMayOwn(minMetricIDs)
metricIDs.UnionMayOwn(m)
return nil
}
const (
uselessSingleTagFilterKeyPrefix = 0
uselessMultiTagFiltersKeyPrefix = 1
uselessNegativeTagFilterKeyPrefix = 2
)
var uselessTagFilterCacheValue = []byte("1")
func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
@ -2376,7 +2163,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m
metricIDs := &uint64set.Set{}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
loopsCount, err := is.updateMetricIDsForOrSuffixesNoFilter(tf, metricIDs, maxMetrics, maxLoopsCount)
loopsCount, err := is.updateMetricIDsForOrSuffixes(tf, metricIDs, maxMetrics, maxLoopsCount)
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
@ -2384,9 +2171,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m
}
// Slow path - scan for all the rows with the given prefix.
// Pass nil filter to getMetricIDsForTagFilterSlow, since it works faster on production workloads
// than non-nil filter with many entries.
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, nil, metricIDs.Add, maxLoopsCount)
loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, metricIDs.Add, maxLoopsCount)
if err != nil {
return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
@ -2395,7 +2180,7 @@ func (is *indexSearch) getMetricIDsForTagFilter(tf *tagFilter, maxMetrics int, m
var errTooManyLoops = fmt.Errorf("too many loops is needed for applying this filter")
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint64set.Set, f func(metricID uint64), maxLoopsCount int64) (int64, error) {
func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, f func(metricID uint64), maxLoopsCount int64) (int64, error) {
if len(tf.orSuffixes) > 0 {
logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)
}
@ -2442,18 +2227,10 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
// There is no need in checking it again with potentially
// slow tf.matchSuffix, which may call regexp.
for _, metricID := range mp.MetricIDs {
if filter != nil && !filter.Has(metricID) {
continue
}
f(metricID)
}
continue
}
if filter != nil && !mp.HasCommonMetricIDs(filter) {
// Faster path: there is no need in calling tf.matchSuffix,
// since the current row has no matching metricIDs.
continue
}
// Slow path: need tf.matchSuffix call.
ok, err := tf.matchSuffix(suffix)
// Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration.
@ -2485,9 +2262,6 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
prevMatch = true
prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
for _, metricID := range mp.MetricIDs {
if filter != nil && !filter.Has(metricID) {
continue
}
f(metricID)
}
}
@ -2497,7 +2271,7 @@ func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, filter *uint6
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffixes(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
if tf.isNegative {
logger.Panicf("BUG: isNegative must be false")
}
@ -2508,7 +2282,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metri
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
lc, err := is.updateMetricIDsForOrSuffixNoFilter(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount)
lc, err := is.updateMetricIDsForOrSuffix(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount)
loopsCount += lc
if err != nil {
return loopsCount, err
@ -2520,25 +2294,7 @@ func (is *indexSearch) updateMetricIDsForOrSuffixesNoFilter(tf *tagFilter, metri
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixesWithFilter(tf *tagFilter, metricIDs, filter *uint64set.Set, maxLoopsCount int64) (int64, error) {
sortedFilter := filter.AppendTo(nil)
kb := kbPool.Get()
defer kbPool.Put(kb)
var loopsCount int64
for _, orSuffix := range tf.orSuffixes {
kb.B = append(kb.B[:0], tf.prefix...)
kb.B = append(kb.B, orSuffix...)
kb.B = append(kb.B, tagSeparatorChar)
lc, err := is.updateMetricIDsForOrSuffixWithFilter(kb.B, metricIDs, sortedFilter, tf.isNegative, maxLoopsCount-loopsCount)
loopsCount += lc
if err != nil {
return loopsCount, err
}
}
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
func (is *indexSearch) updateMetricIDsForOrSuffix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
ts := &is.ts
mp := &is.mp
mp.Reset()
@ -2572,156 +2328,8 @@ func (is *indexSearch) updateMetricIDsForOrSuffixNoFilter(prefix []byte, metricI
return loopsCount, nil
}
func (is *indexSearch) updateMetricIDsForOrSuffixWithFilter(prefix []byte, metricIDs *uint64set.Set, sortedFilter []uint64, isNegative bool, maxLoopsCount int64) (int64, error) {
if len(sortedFilter) == 0 {
return 0, nil
}
firstFilterMetricID := sortedFilter[0]
lastFilterMetricID := sortedFilter[len(sortedFilter)-1]
ts := &is.ts
mp := &is.mp
mp.Reset()
var loopsCount int64
loopsPaceLimiter := 0
ts.Seek(prefix)
var sf []uint64
var metricID uint64
for ts.NextItem() {
if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return loopsCount, err
}
}
loopsPaceLimiter++
item := ts.Item
if !bytes.HasPrefix(item, prefix) {
return loopsCount, nil
}
if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
return loopsCount, err
}
loopsCount += int64(mp.MetricIDsLen())
if loopsCount > maxLoopsCount {
return loopsCount, errTooManyLoops
}
firstMetricID, lastMetricID := mp.FirstAndLastMetricIDs()
if lastMetricID < firstFilterMetricID {
// Skip the item, since it contains metricIDs lower
// than metricIDs in sortedFilter.
continue
}
if firstMetricID > lastFilterMetricID {
// Stop searching, since the current item and all the subsequent items
// contain metricIDs higher than metricIDs in sortedFilter.
return loopsCount, nil
}
sf = sortedFilter
mp.ParseMetricIDs()
matchingMetricIDs := mp.MetricIDs[:0]
for _, metricID = range mp.MetricIDs {
if len(sf) == 0 {
break
}
if metricID > sf[0] {
n := binarySearchUint64(sf, metricID)
sf = sf[n:]
if len(sf) == 0 {
break
}
}
if metricID < sf[0] {
continue
}
matchingMetricIDs = append(matchingMetricIDs, metricID)
sf = sf[1:]
}
if len(matchingMetricIDs) > 0 {
if isNegative {
for _, metricID := range matchingMetricIDs {
metricIDs.Del(metricID)
}
} else {
metricIDs.AddMulti(matchingMetricIDs)
}
}
}
if err := ts.Error(); err != nil {
return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
}
return loopsCount, nil
}
func binarySearchUint64(a []uint64, v uint64) uint {
// Copy-pasted sort.Search from https://golang.org/src/sort/search.go?s=2246:2286#L49
i, j := uint(0), uint(len(a))
for i < j {
h := (i + j) >> 1
if h < uint(len(a)) && a[h] < v {
i = h + 1
} else {
j = h
}
}
return i
}
var errFallbackToGlobalSearch = errors.New("fall back from per-day index search to global index search")
var errMissingMetricIDsForDate = errors.New("missing metricIDs for date")
func (is *indexSearch) getMetricIDsForTimeRange(tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
atomic.AddUint64(&is.db.dateMetricIDsSearchCalls, 1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
// Too much dates must be covered. Give up.
return nil, errMissingMetricIDsForDate
}
if minDate == maxDate {
// Fast path - query on a single day.
metricIDs, err := is.getMetricIDsForDate(minDate, maxMetrics)
if err != nil {
return nil, err
}
atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1)
return metricIDs, nil
}
// Slower path - query over multiple days in parallel.
metricIDs := &uint64set.Set{}
var wg sync.WaitGroup
var errGlobal error
var mu sync.Mutex // protects metricIDs + errGlobal from concurrent access below.
for minDate <= maxDate {
wg.Add(1)
go func(date uint64) {
defer wg.Done()
isLocal := is.db.getIndexSearch(is.deadline)
m, err := isLocal.getMetricIDsForDate(date, maxMetrics)
is.db.putIndexSearch(isLocal)
mu.Lock()
defer mu.Unlock()
if errGlobal != nil {
return
}
if err != nil {
errGlobal = err
return
}
if metricIDs.Len() < maxMetrics {
metricIDs.UnionMayOwn(m)
}
}(minDate)
minDate++
}
wg.Wait()
if errGlobal != nil {
return nil, errGlobal
}
atomic.AddUint64(&is.db.dateMetricIDsSearchHits, 1)
return metricIDs, nil
}
const maxDaysForPerDaySearch = 40
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
@ -2875,10 +2483,6 @@ func (is *indexSearch) getMetricIDsForDateAndFilters(date uint64, tfs *TagFilter
// so later they can be filtered out with negative filters.
m, err := is.getMetricIDsForDate(date, maxDateMetrics)
if err != nil {
if err == errMissingMetricIDsForDate {
// Zero time series were written on the given date.
return nil, nil
}
return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
}
if m.Len() >= maxDateMetrics {
@ -3100,13 +2704,18 @@ func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
}
func (is *indexSearch) getMetricIDsForDateTagFilter(tf *tagFilter, date uint64, commonPrefix []byte, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
// Augument tag filter prefix for per-date search instead of global search.
if !bytes.HasPrefix(tf.prefix, commonPrefix) {
logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
}
kb := kbPool.Get()
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
if date != 0 {
// Use per-date search.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
} else {
// Use global search if date isn't set.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
}
kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
tfNew := *tf
tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
@ -3152,8 +2761,14 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64
// Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
kb := kbPool.Get()
defer kbPool.Put(kb)
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
if date != 0 {
// Use per-date search
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
kb.B = encoding.MarshalUint64(kb.B, date)
} else {
// Use global search
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
}
kb.B = marshalTagValue(kb.B, nil)
var metricIDs uint64set.Set
if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
@ -3162,15 +2777,6 @@ func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64
return &metricIDs, nil
}
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
kb := kbPool.Get()
defer kbPool.Put(kb)
// Extract all the metricIDs from (__name__=value)->metricIDs entries.
kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
return is.updateMetricIDsForPrefix(kb.B, metricIDs, maxMetrics)
}
func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts
mp := &is.mp
@ -3211,38 +2817,6 @@ func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64
// The estimated number of index scan loops a single loop in updateMetricIDsByMetricNameMatch takes.
const loopsCountPerMetricNameMatch = 150
func (is *indexSearch) intersectMetricIDsWithTagFilter(tf *tagFilter, filter *uint64set.Set) (*uint64set.Set, error) {
if filter.Len() == 0 {
return nil, nil
}
metricIDs := filter
if !tf.isNegative {
metricIDs = &uint64set.Set{}
}
if len(tf.orSuffixes) > 0 {
// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
_, err := is.updateMetricIDsForOrSuffixesWithFilter(tf, metricIDs, filter, int64Max)
if err != nil {
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
}
// Slow path - scan for all the rows with the given prefix.
_, err := is.getMetricIDsForTagFilterSlow(tf, filter, func(metricID uint64) {
if tf.isNegative {
// filter must be equal to metricIDs
metricIDs.Del(metricID)
} else {
metricIDs.Add(metricID)
}
}, int64Max)
if err != nil {
return nil, fmt.Errorf("error when intersecting metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
}
return metricIDs, nil
}
var kbPool bytesutil.ByteBufferPool
// Returns local unique MetricID.
@ -3367,21 +2941,6 @@ func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool
return mp.Date == x.Date && mp.NSPrefix == x.NSPrefix
}
// FirstAndLastMetricIDs returns the first and the last metricIDs in the mp.tail.
func (mp *tagToMetricIDsRowParser) FirstAndLastMetricIDs() (uint64, uint64) {
tail := mp.tail
if len(tail) < 8 {
logger.Panicf("BUG: cannot unmarshal metricID from %d bytes; need 8 bytes", len(tail))
return 0, 0
}
firstMetricID := encoding.UnmarshalUint64(tail)
lastMetricID := firstMetricID
if len(tail) > 8 {
lastMetricID = encoding.UnmarshalUint64(tail[len(tail)-8:])
}
return firstMetricID, lastMetricID
}
// MetricIDsLen returns the number of MetricIDs in the mp.tail
func (mp *tagToMetricIDsRowParser) MetricIDsLen() int {
return len(mp.tail) / 8
@ -3410,16 +2969,6 @@ func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
}
}
// HasCommonMetricIDs returns true if mp has at least one common metric id with filter.
func (mp *tagToMetricIDsRowParser) HasCommonMetricIDs(filter *uint64set.Set) bool {
for _, metricID := range mp.MetricIDs {
if filter.Has(metricID) {
return true
}
}
return false
}
// IsDeletedTag verifies whether the tag from mp is deleted according to dmis.
//
// dmis must contain deleted MetricIDs.

View file

@ -1546,12 +1546,12 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
}
}
// Check that all the metrics are found in updateMetricIDsAll
var metricIDs uint64set.Set
if err := is2.updateMetricIDsAll(&metricIDs, metricsPerDay*days); err != nil {
// Check that all the metrics are found in global index
metricIDs, err := is2.getMetricIDsForDate(0, metricsPerDay*days)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !allMetricIDs.Equal(&metricIDs) {
if !allMetricIDs.Equal(metricIDs) {
t.Fatalf("unexpected metricIDs found;\ngot\n%d\nwant\n%d", metricIDs.AppendTo(nil), allMetricIDs.AppendTo(nil))
}

View file

@ -209,13 +209,6 @@ func (tfs *TagFilters) Reset() {
tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricIDs)
}
func (tfs *TagFilters) marshal(dst []byte) []byte {
for i := range tfs.tfs {
dst = tfs.tfs[i].Marshal(dst)
}
return dst
}
// tagFilter represents a filter used for filtering tags.
type tagFilter struct {
key []byte