From 494ad0fdb3acce1c1e3ae7c84d73227c3a1a2ad2 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 13 Nov 2019 17:58:05 +0200 Subject: [PATCH] lib/storage: remove inmemory index for recent hour, since it uses too much memory Production workload shows that the index requires ~4Kb of RAM per active time series. This is too much for high number of active time series, so let's delete this index. Now the queries should fall back to the index for the current day instead of the index for the recent hour. The query performance for the current day index should be good enough given the 100M rows/sec scan speed per CPU core. --- app/vmstorage/main.go | 27 +- lib/storage/index_db.go | 53 ---- lib/storage/index_db_test.go | 10 - lib/storage/inmemory_inverted_index.go | 313 -------------------- lib/storage/inmemory_inverted_index_test.go | 47 --- lib/storage/search_test.go | 18 +- lib/storage/storage.go | 77 +---- lib/storage/storage_test.go | 4 - 8 files changed, 18 insertions(+), 531 deletions(-) delete mode 100644 lib/storage/inmemory_inverted_index.go delete mode 100644 lib/storage/inmemory_inverted_index_test.go diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 878e10a39..c1e7855dd 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -26,8 +26,6 @@ var ( vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") - disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+ - "This may be useful in order to reduce memory usage when working with high number of time series") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") ) @@ -37,9 +35,6 @@ func main() { buildinfo.Init() logger.Init() - if *disableRecentHourIndex { - storage.DisableRecentHourIndex() - } storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) @@ -357,25 +352,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().ItemsCount) }) - metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 { - return float64(m().RecentHourInvertedIndexSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 { - return float64(m().RecentHourInvertedIndexSizeBytes) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 { - return float64(m().RecentHourInvertedIndexUniqueTagPairsSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 { - return float64(m().RecentHourInvertedIndexPendingMetricIDsSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 { - return float64(idbm().RecentHourInvertedIndexSearchCalls) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 { - return float64(idbm().RecentHourInvertedIndexSearchHits) - }) - metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 { return float64(idbm().DateRangeSearchCalls) }) @@ -436,6 +412,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 { + return float64(m().DateMetricIDCacheSizeBytes) + }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSizeBytes) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ac3bb9d80..1a3363a79 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -95,12 +95,6 @@ type indexDB struct { // The number of successful searches for metric ids by days. dateMetricIDsSearchHits uint64 - // The number of calls for recent hour searches over inverted index. - recentHourInvertedIndexSearchCalls uint64 - - // The number of hits for recent hour searches over inverted index. - recentHourInvertedIndexSearchHits uint64 - // The number of calls for date range searches. dateRangeSearchCalls uint64 @@ -231,9 +225,6 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 - RecentHourInvertedIndexSearchCalls uint64 - RecentHourInvertedIndexSearchHits uint64 - DateRangeSearchCalls uint64 DateRangeSearchHits uint64 @@ -275,9 +266,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) - m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls) - m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits) - m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) @@ -1693,10 +1681,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) - if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) { - // Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour. - return nil - } ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) if err != nil { return err @@ -2230,43 +2214,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, return nil, false } -func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { - if disableRecentHourIndex { - return false - } - - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1) - k := accountProjectKey{ - AccountID: tfs.accountID, - ProjectID: tfs.projectID, - } - minHour := uint64(tr.MinTimestamp) / msecPerHour - maxHour := uint64(tr.MaxTimestamp) / msecPerHour - - hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { - // The tr fits the current hour. - hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { - // The tr fits the previous hour. - hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { - // The tr spans the previous and the current hours. - hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs) - hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - return false -} - func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID, accountID, projectID) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index c41936060..fd11a5f31 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1500,17 +1500,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { ProjectID: projectID, } prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m} - prevMetricIDs.iidx = newInmemoryInvertedIndex() - prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.byTenant) - if len(prevMetricIDs.iidx.pendingEntries) > 0 { - t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the previous hour", len(prevMetricIDs.iidx.pendingEntries)) - } currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m} - currMetricIDs.iidx = newInmemoryInvertedIndex() - currMetricIDs.iidx.MustUpdate(db, currMetricIDs.byTenant) - if len(currMetricIDs.iidx.pendingEntries) > 0 { - t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the current hour", len(currMetricIDs.iidx.pendingEntries)) - } } } diff --git a/lib/storage/inmemory_inverted_index.go b/lib/storage/inmemory_inverted_index.go deleted file mode 100644 index fdbc60744..000000000 --- a/lib/storage/inmemory_inverted_index.go +++ /dev/null @@ -1,313 +0,0 @@ -package storage - -import ( - "bytes" - "fmt" - "io" - "sync" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" -) - -type inmemoryInvertedIndex struct { - mu sync.RWMutex - m map[string]*uint64set.Set - pendingEntries []pendingHourMetricIDEntry -} - -func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte { - iidx.mu.RLock() - defer iidx.mu.RUnlock() - - // Marshal iidx.m - var metricIDs []uint64 - dst = encoding.MarshalUint64(dst, uint64(len(iidx.m))) - for k, v := range iidx.m { - dst = encoding.MarshalBytes(dst, []byte(k)) - metricIDs = v.AppendTo(metricIDs[:0]) - dst = marshalMetricIDs(dst, metricIDs) - } - - // Marshal iidx.pendingEntries - dst = encoding.MarshalUint64(dst, uint64(len(iidx.pendingEntries))) - for _, e := range iidx.pendingEntries { - dst = encoding.MarshalUint32(dst, e.AccountID) - dst = encoding.MarshalUint32(dst, e.ProjectID) - dst = encoding.MarshalUint64(dst, e.MetricID) - } - - return dst -} - -func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) { - iidx.mu.Lock() - defer iidx.mu.Unlock() - - // Unmarshal iidx.m - if len(src) < 8 { - return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src)) - } - mLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - m := make(map[string]*uint64set.Set, mLen) - var metricIDs []uint64 - for i := 0; i < mLen; i++ { - tail, k, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err) - } - src = tail - tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err) - } - src = tail - var v uint64set.Set - for _, metricID := range metricIDs { - v.Add(metricID) - } - m[string(k)] = &v - } - iidx.m = m - - // Unmarshal iidx.pendingEntries - if len(src) < 8 { - return src, fmt.Errorf("cannot unmarshal pendingEntriesLen from %d bytes; want at least %d bytes", len(src), 8) - } - pendingEntriesLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - if len(src) < pendingEntriesLen*16 { - return src, fmt.Errorf("cannot unmarshal %d pending entries from %d bytes; want at least %d bytes", pendingEntriesLen, len(src), pendingEntriesLen*16) - } - for i := 0; i < pendingEntriesLen; i++ { - var e pendingHourMetricIDEntry - e.AccountID = encoding.UnmarshalUint32(src) - src = src[4:] - e.ProjectID = encoding.UnmarshalUint32(src) - src = src[4:] - e.MetricID = encoding.UnmarshalUint64(src) - src = src[8:] - iidx.pendingEntries = append(iidx.pendingEntries, e) - } - - return src, nil -} - -func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { - dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) - for _, metricID := range metricIDs { - dst = encoding.MarshalUint64(dst, metricID) - } - return dst -} - -func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) { - if len(src) < 8 { - return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src)) - } - metricIDsLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - if len(src) < 8*metricIDsLen { - return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src)) - } - for i := 0; i < metricIDsLen; i++ { - metricID := encoding.UnmarshalUint64(src) - src = src[8:] - dst = append(dst, metricID) - } - return src, dst, nil -} - -func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 { - n := uint64(0) - iidx.mu.RLock() - for k, v := range iidx.m { - n += uint64(len(k)) - n += v.SizeBytes() - } - n += uint64(len(iidx.pendingEntries)) * uint64(unsafe.Sizeof(pendingHourMetricIDEntry{})) - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int { - if iidx == nil { - return 0 - } - iidx.mu.RLock() - n := len(iidx.m) - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetEntriesCount() int { - if iidx == nil { - return 0 - } - n := 0 - iidx.mu.RLock() - for _, v := range iidx.m { - n += v.Len() - } - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int { - if iidx == nil { - return 0 - } - iidx.mu.RLock() - n := len(iidx.pendingEntries) - iidx.mu.RUnlock() - return n -} - -func newInmemoryInvertedIndex() *inmemoryInvertedIndex { - return &inmemoryInvertedIndex{ - m: make(map[string]*uint64set.Set), - } -} - -func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, byTenant map[accountProjectKey]*uint64set.Set) { - var entries []pendingHourMetricIDEntry - var metricIDs []uint64 - for k, v := range byTenant { - var e pendingHourMetricIDEntry - e.AccountID = k.AccountID - e.ProjectID = k.ProjectID - metricIDs = v.AppendTo(metricIDs[:0]) - for _, metricID := range metricIDs { - e.MetricID = metricID - entries = append(entries, e) - } - } - - iidx.mu.Lock() - iidx.pendingEntries = append(iidx.pendingEntries, entries...) - if err := iidx.addPendingEntriesLocked(idb); err != nil { - logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err) - } - iidx.mu.Unlock() -} - -func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, e pendingHourMetricIDEntry) { - iidx.mu.Lock() - iidx.pendingEntries = append(iidx.pendingEntries, e) - if err := iidx.addPendingEntriesLocked(idb); err != nil { - logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err) - } - iidx.mu.Unlock() -} - -func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) { - if iidx == nil { - return - } - var result *uint64set.Set - var tfFirst *tagFilter - for i := range tfs.tfs { - if tfs.tfs[i].isNegative { - continue - } - tfFirst = &tfs.tfs[i] - break - } - - iidx.mu.RLock() - defer iidx.mu.RUnlock() - - if tfFirst == nil { - result = allMetricIDs.Clone() - } else { - result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix) - result.Intersect(allMetricIDs) // This line is required for filtering metrics by (accountID, projectID) - } - for i := range tfs.tfs { - tf := &tfs.tfs[i] - if tf == tfFirst { - continue - } - m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix) - if tf.isNegative { - result.Subtract(m) - } else { - result.Intersect(m) - } - if result.Len() == 0 { - return - } - } - metricIDs.Union(result) -} - -func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set { - if !bytes.HasPrefix(tf.prefix, commonPrefix) { - logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix) - } - prefix := tf.prefix[len(commonPrefix):] - var m uint64set.Set - kb := kbPool.Get() - defer kbPool.Put(kb) - for k, v := range iidx.m { - if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) { - continue - } - kb.B = append(kb.B[:0], k[len(prefix):]...) - ok, err := tf.matchSuffix(kb.B) - if err != nil { - logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err) - } - if !ok { - continue - } - m.Union(v) - } - return &m -} - -func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error { - entries := iidx.pendingEntries - iidx.pendingEntries = iidx.pendingEntries[:0] - - kb := kbPool.Get() - defer kbPool.Put(kb) - - mn := GetMetricName() - defer PutMetricName(mn) - for _, e := range entries { - var err error - metricID := e.MetricID - kb.B, err = idb.searchMetricName(kb.B[:0], metricID, e.AccountID, e.ProjectID) - if err != nil { - if err == io.EOF { - iidx.pendingEntries = append(iidx.pendingEntries, e) - continue - } - return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err) - } - if err = mn.Unmarshal(kb.B); err != nil { - return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err) - } - kb.B = marshalTagValue(kb.B[:0], nil) - kb.B = marshalTagValue(kb.B, mn.MetricGroup) - iidx.addMetricIDLocked(kb.B, metricID) - for i := range mn.Tags { - kb.B = mn.Tags[i].Marshal(kb.B[:0]) - iidx.addMetricIDLocked(kb.B, metricID) - } - } - return nil -} - -func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) { - v := iidx.m[string(key)] - if v == nil { - v = &uint64set.Set{} - iidx.m[string(key)] = v - } - v.Add(metricID) -} diff --git a/lib/storage/inmemory_inverted_index_test.go b/lib/storage/inmemory_inverted_index_test.go deleted file mode 100644 index bbbd8190c..000000000 --- a/lib/storage/inmemory_inverted_index_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package storage - -import ( - "fmt" - "reflect" - "testing" -) - -func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) { - iidx := newInmemoryInvertedIndex() - const keysCount = 100 - const metricIDsCount = 10000 - for i := 0; i < metricIDsCount; i++ { - k := fmt.Sprintf("key %d", i%keysCount) - iidx.addMetricIDLocked([]byte(k), uint64(i)) - } - for i := 0; i < 10; i++ { - var e pendingHourMetricIDEntry - e.AccountID = uint32(i) - e.ProjectID = uint32(i + 324) - e.MetricID = uint64(i * 43) - iidx.pendingEntries = append(iidx.pendingEntries, e) - } - - data := iidx.Marshal(nil) - - iidx2 := newInmemoryInvertedIndex() - tail, err := iidx2.Unmarshal(data) - if err != nil { - t.Fatalf("cannot unmarshal iidx: %s", err) - } - if len(tail) != 0 { - t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail)) - } - if len(iidx.m) != len(iidx2.m) { - t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m)) - } - if !reflect.DeepEqual(iidx.pendingEntries, iidx2.pendingEntries) { - t.Fatalf("unexpected pendingMetricIDs; got\n%v;\nwant\n%v", iidx2.pendingEntries, iidx.pendingEntries) - } - for k, v := range iidx.m { - v2 := iidx2.m[k] - if !v.Equal(v2) { - t.Fatalf("unexpected set for key %q", k) - } - } -} diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index d72612137..a828b6157 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -78,21 +78,15 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { func TestSearch(t *testing.T) { t.Run("global_inverted_index", func(t *testing.T) { - testSearchGeneric(t, false, false) + testSearchGeneric(t, false) }) t.Run("perday_inverted_index", func(t *testing.T) { - testSearchGeneric(t, false, true) - }) - t.Run("recent_hour_global_inverted_index", func(t *testing.T) { - testSearchGeneric(t, true, false) - }) - t.Run("recent_hour_perday_inverted_index", func(t *testing.T) { - testSearchGeneric(t, true, true) + testSearchGeneric(t, true) }) } -func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) { - path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex) +func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) { + path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex) st, err := OpenStorage(path, 0) if err != nil { t.Fatalf("cannot open storage %q: %s", path, err) @@ -154,10 +148,6 @@ func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayIn extDB.startDateForPerDayInvertedIndex = 0 }) } - if forceRecentHourInvertedIndex { - hm := st.currHourMetricIDs.Load().(*hourMetricIDs) - hm.isFull = true - } // Run search. tr := TimeRange{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 856fff88e..b44a5c2e0 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -27,17 +27,6 @@ import ( const maxRetentionMonths = 12 * 100 -var disableRecentHourIndex = false - -// DisableRecentHourIndex disables in-memory inverted index for recent hour. -// -// This may be useful in order to save RAM for high cardinality data. -// -// This function must be called before OpenStorage. -func DisableRecentHourIndex() { - disableRecentHourIndex = true -} - // Storage represents TSDB storage. type Storage struct { // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. @@ -327,16 +316,12 @@ type Metrics struct { MetricNameCacheCollisions uint64 DateMetricIDCacheSize uint64 + DateMetricIDCacheSizeBytes uint64 DateMetricIDCacheSyncsCount uint64 DateMetricIDCacheResetsCount uint64 HourMetricIDCacheSize uint64 - RecentHourInvertedIndexSize uint64 - RecentHourInvertedIndexSizeBytes uint64 - RecentHourInvertedIndexUniqueTagPairsSize uint64 - RecentHourInvertedIndexPendingMetricIDsSize uint64 - IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -382,6 +367,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheCollisions += cs.Collisions m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) + m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes()) m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) @@ -393,18 +379,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { } m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) - m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount()) - m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount()) - - m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes() - m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes() - - m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen()) - m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen()) - - m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen()) - m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen()) - s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } @@ -519,7 +493,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if !fs.IsPathExist(path) { logger.Infof("nothing to load from %q", path) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -531,7 +504,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if len(src) < 24 { logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -544,7 +516,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if hourLoaded != hour { logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -555,7 +526,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if uint64(len(src)) < 8*hmLen { logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -602,30 +572,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs byTenant[k] = m } - // Unmarshal hm.iidx - iidx := newInmemoryInvertedIndex() - if !disableRecentHourIndex { - tail, err := iidx.Unmarshal(src) - if err != nil { - logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, - } - } - if len(tail) > 0 { - logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, - } - } - } - logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) return &hourMetricIDs{ m: m, - iidx: iidx, byTenant: byTenant, hour: hourLoaded, isFull: isFull != 0, @@ -665,11 +614,6 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { } } - if !disableRecentHourIndex { - // Marshal hm.iidx - dst = hm.iidx.Marshal(dst) - } - if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } @@ -1030,9 +974,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { } s.pendingHourEntries = append(s.pendingHourEntries, e) s.pendingHourEntriesLock.Unlock() - if !disableRecentHourIndex { - hm.iidx.AddMetricID(idb, e) - } } // Slower path: check global cache for (date, metricID) entry. @@ -1098,6 +1039,15 @@ func (dmc *dateMetricIDCache) EntriesCount() int { return n } +func (dmc *dateMetricIDCache) SizeBytes() uint64 { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + n := uint64(0) + for _, e := range byDate.m { + n += e.v.SizeBytes() + } + return n +} + func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { byDate := dmc.byDate.Load().(*byDateMetricIDMap) v := byDate.get(date) @@ -1210,19 +1160,16 @@ func (s *Storage) updateCurrHourMetricIDs() { // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set - var iidx *inmemoryInvertedIndex var byTenant map[accountProjectKey]*uint64set.Set isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() - iidx = hm.iidx byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant)) for k, e := range hm.byTenant { byTenant[k] = e.Clone() } } else { m = &uint64set.Set{} - iidx = newInmemoryInvertedIndex() byTenant = make(map[accountProjectKey]*uint64set.Set) isFull = true } @@ -1243,7 +1190,6 @@ func (s *Storage) updateCurrHourMetricIDs() { hmNew := &hourMetricIDs{ m: m, - iidx: iidx, byTenant: byTenant, hour: hour, isFull: isFull, @@ -1256,7 +1202,6 @@ func (s *Storage) updateCurrHourMetricIDs() { type hourMetricIDs struct { m *uint64set.Set - iidx *inmemoryInvertedIndex byTenant map[accountProjectKey]*uint64set.Set hour uint64 isFull bool diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a1af4fa28..3d36482f0 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -107,7 +107,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -143,7 +142,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) @@ -204,7 +202,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -265,7 +262,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12)