From 7b6623558f920b2a27d58fbfa808e20237455a13 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 10 Jun 2019 14:02:44 +0300 Subject: [PATCH] lib/storage: skip adaptive searching for tag filter matching the minimum number of metrics if the identical previous search didn't found such filter This should improve speed for searching metrics among high number of time series with high churn rate like in big Kubernetes clusters with frequent deployments. --- app/vmstorage/main.go | 12 ++++++++++++ lib/storage/index_db.go | 37 ++++++++++++++++++++++++++++++++++--- lib/storage/tag_filters.go | 7 +++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 96c3641229..09ab08eea9 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -369,6 +369,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSize) }) + metrics.NewGauge(`vm_cache_entries{type="indexdb/uselessTagFilters"}`, func() float64 { + return float64(idbm().UselessTagFiltersCacheSize) + }) metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheSize()) }) @@ -388,6 +391,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheBytesSize) }) + metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/uselessTagFilters"}`, func() float64 { + return float64(idbm().UselessTagFiltersCacheBytesSize) + }) metrics.NewGauge(`vm_cache_requests_total{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheRequests) @@ -416,6 +422,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheRequests) }) + metrics.NewGauge(`vm_cache_requests_total{type="indexdb/uselessTagFilters"}`, func() float64 { + return float64(idbm().UselessTagFiltersCacheRequests) + }) metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheRequests()) }) @@ -447,6 +456,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheMisses) }) + metrics.NewGauge(`vm_cache_misses_total{type="indexdb/uselessTagFilters"}`, func() float64 { + return float64(idbm().UselessTagFiltersCacheMisses) + }) metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 { return float64(storage.RegexpCacheMisses()) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index f37158e7da..511c98eca1 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -60,6 +60,10 @@ type indexDB struct { // Cache for fast MetricID -> MetricName lookup. metricNameCache *fastcache.Cache + // Cache holding useless TagFilters entries, which have no tag filters + // matching low number of metrics. + uselessTagFiltersCache *fastcache.Cache + indexSearchPool sync.Pool // An inmemory map[uint64]struct{} of deleted metricIDs. @@ -118,6 +122,8 @@ func openIndexDB(path string, metricIDCache, metricNameCache *fastcache.Cache, c metricIDCache: metricIDCache, metricNameCache: metricNameCache, + uselessTagFiltersCache: fastcache.New(mem / 128), + currHourMetricIDs: currHourMetricIDs, prevHourMetricIDs: prevHourMetricIDs, } @@ -140,6 +146,11 @@ type IndexDBMetrics struct { TagCacheRequests uint64 TagCacheMisses uint64 + UselessTagFiltersCacheSize uint64 + UselessTagFiltersCacheBytesSize uint64 + UselessTagFiltersCacheRequests uint64 + UselessTagFiltersCacheMisses uint64 + DeletedMetricsCount uint64 IndexDBRefCount uint64 @@ -161,12 +172,21 @@ func (db *indexDB) scheduleToDrop() { // UpdateMetrics updates m with metrics from the db. func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { var cs fastcache.Stats + + cs.Reset() db.tagCache.UpdateStats(&cs) m.TagCacheSize += cs.EntriesCount m.TagCacheBytesSize += cs.BytesSize m.TagCacheRequests += cs.GetBigCalls m.TagCacheMisses += cs.Misses + cs.Reset() + db.uselessTagFiltersCache.UpdateStats(&cs) + m.UselessTagFiltersCacheSize += cs.EntriesCount + m.UselessTagFiltersCacheBytesSize += cs.BytesSize + m.UselessTagFiltersCacheRequests += cs.GetBigCalls + m.UselessTagFiltersCacheMisses += cs.Misses + m.DeletedMetricsCount += uint64(len(db.getDeletedMetricIDs())) m.IndexDBRefCount += atomic.LoadUint64(&db.refCount) @@ -303,7 +323,7 @@ func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { db.metricNameCache.Set(key[:], metricName) } -func marshalTagFiltersKey(dst []byte, tfss []*TagFilters) []byte { +func marshalTagFiltersKeyVersioned(dst []byte, tfss []*TagFilters) []byte { prefix := atomic.LoadUint64(&tagFiltersKeyGen) dst = encoding.MarshalUint64(dst, prefix) for _, tfs := range tfss { @@ -902,7 +922,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) tfKeyBuf := tagFiltersKeyBufPool.Get() defer tagFiltersKeyBufPool.Put(tfKeyBuf) - tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss) + tfKeyBuf.B = marshalTagFiltersKeyVersioned(tfKeyBuf.B[:0], tfss) tsids, ok := db.getFromTagCache(tfKeyBuf.B) if ok { // Fast path - tsids found in the cache. @@ -1165,6 +1185,14 @@ func (is *indexSearch) updateMetricIDsByMetricNameMatch(metricIDs, srcMetricIDs } func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters, maxMetrics int) (*tagFilter, map[uint64]struct{}, error) { + kb := &is.kb + kb.B = tfs.marshal(kb.B[:0]) + kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) + if len(is.db.uselessTagFiltersCache.Get(nil, kb.B)) > 0 { + // Skip useless work below, since the tfs doesn't contain tag filters matching less than maxMetrics metrics. + return nil, nil, errTooManyMetrics + } + // Iteratively increase maxAllowedMetrics up to maxMetrics in order to limit // the time required for founding the tag filter with minimum matching metrics. maxAllowedMetrics := 16 @@ -1183,7 +1211,10 @@ func (is *indexSearch) getTagFilterWithMinMetricIDsCountAdaptive(tfs *TagFilters // Too many metrics matched. if maxAllowedMetrics >= maxMetrics { - // The tag filter with minimum matching metrics matches at least maxMetrics. + // The tag filter with minimum matching metrics matches at least maxMetrics metrics. + kb.B = tfs.marshal(kb.B[:0]) + kb.B = encoding.MarshalUint64(kb.B, uint64(maxMetrics)) + is.db.uselessTagFiltersCache.Set(kb.B, []byte("1")) return nil, nil, errTooManyMetrics } diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 66c1c5bd5d..186d002afd 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -81,6 +81,13 @@ func (tfs *TagFilters) Reset() { tfs.commonPrefix = marshalCommonPrefix(tfs.commonPrefix[:0], nsPrefixTagToMetricID) } +func (tfs *TagFilters) marshal(dst []byte) []byte { + for i := range tfs.tfs { + dst = tfs.tfs[i].Marshal(dst) + } + return dst +} + // tagFilter represents a filter used for filtering tags. type tagFilter struct { key []byte