diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 878e10a395..c1e7855dd4 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -26,8 +26,6 @@ var ( vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") - disableRecentHourIndex = flag.Bool("disableRecentHourIndex", false, "Whether to disable inmemory inverted index for recent hour. "+ - "This may be useful in order to reduce memory usage when working with high number of time series") bigMergeConcurrency = flag.Int("bigMergeConcurrency", 0, "The maximum number of CPU cores to use for big merges. Default value is used if set to 0") smallMergeConcurrency = flag.Int("smallMergeConcurrency", 0, "The maximum number of CPU cores to use for small merges. Default value is used if set to 0") ) @@ -37,9 +35,6 @@ func main() { buildinfo.Init() logger.Init() - if *disableRecentHourIndex { - storage.DisableRecentHourIndex() - } storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) @@ -357,25 +352,6 @@ func registerStorageMetrics(strg *storage.Storage) { return float64(idbm().ItemsCount) }) - metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 { - return float64(m().RecentHourInvertedIndexSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 { - return float64(m().RecentHourInvertedIndexSizeBytes) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 { - return float64(m().RecentHourInvertedIndexUniqueTagPairsSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 { - return float64(m().RecentHourInvertedIndexPendingMetricIDsSize) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 { - return float64(idbm().RecentHourInvertedIndexSearchCalls) - }) - metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 { - return float64(idbm().RecentHourInvertedIndexSearchHits) - }) - metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 { return float64(idbm().DateRangeSearchCalls) }) @@ -436,6 +412,9 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_cache_size_bytes{type="storage/metricName"}`, func() float64 { return float64(m().MetricNameCacheSizeBytes) }) + metrics.NewGauge(`vm_cache_size_bytes{type="storage/date_metricID"}`, func() float64 { + return float64(m().DateMetricIDCacheSizeBytes) + }) metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { return float64(idbm().TagCacheSizeBytes) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index ac3bb9d803..1a3363a795 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -95,12 +95,6 @@ type indexDB struct { // The number of successful searches for metric ids by days. dateMetricIDsSearchHits uint64 - // The number of calls for recent hour searches over inverted index. - recentHourInvertedIndexSearchCalls uint64 - - // The number of hits for recent hour searches over inverted index. - recentHourInvertedIndexSearchHits uint64 - // The number of calls for date range searches. dateRangeSearchCalls uint64 @@ -231,9 +225,6 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 - RecentHourInvertedIndexSearchCalls uint64 - RecentHourInvertedIndexSearchHits uint64 - DateRangeSearchCalls uint64 DateRangeSearchHits uint64 @@ -275,9 +266,6 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) - m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls) - m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits) - m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) @@ -1693,10 +1681,6 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) - if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) { - // Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour. - return nil - } ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) if err != nil { return err @@ -2230,43 +2214,6 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int, return nil, false } -func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { - if disableRecentHourIndex { - return false - } - - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1) - k := accountProjectKey{ - AccountID: tfs.accountID, - ProjectID: tfs.projectID, - } - minHour := uint64(tr.MinTimestamp) / msecPerHour - maxHour := uint64(tr.MaxTimestamp) / msecPerHour - - hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { - // The tr fits the current hour. - hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) - if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { - // The tr fits the previous hour. - hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { - // The tr spans the previous and the current hours. - hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.byTenant[k], tfs) - hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.byTenant[k], tfs) - atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) - return true - } - return false -} - func (db *indexDB) storeDateMetricID(date, metricID uint64, accountID, projectID uint32) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID, accountID, projectID) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index c41936060e..fd11a5f31e 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1500,17 +1500,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { ProjectID: projectID, } prevMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: prevMetricIDs.m} - prevMetricIDs.iidx = newInmemoryInvertedIndex() - prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.byTenant) - if len(prevMetricIDs.iidx.pendingEntries) > 0 { - t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the previous hour", len(prevMetricIDs.iidx.pendingEntries)) - } currMetricIDs.byTenant = map[accountProjectKey]*uint64set.Set{k: currMetricIDs.m} - currMetricIDs.iidx = newInmemoryInvertedIndex() - currMetricIDs.iidx.MustUpdate(db, currMetricIDs.byTenant) - if len(currMetricIDs.iidx.pendingEntries) > 0 { - t.Fatalf("couldn't add %d metricIDs to inmemory inverted index for the current hour", len(currMetricIDs.iidx.pendingEntries)) - } } } diff --git a/lib/storage/inmemory_inverted_index.go b/lib/storage/inmemory_inverted_index.go deleted file mode 100644 index fdbc607449..0000000000 --- a/lib/storage/inmemory_inverted_index.go +++ /dev/null @@ -1,313 +0,0 @@ -package storage - -import ( - "bytes" - "fmt" - "io" - "sync" - "unsafe" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" -) - -type inmemoryInvertedIndex struct { - mu sync.RWMutex - m map[string]*uint64set.Set - pendingEntries []pendingHourMetricIDEntry -} - -func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte { - iidx.mu.RLock() - defer iidx.mu.RUnlock() - - // Marshal iidx.m - var metricIDs []uint64 - dst = encoding.MarshalUint64(dst, uint64(len(iidx.m))) - for k, v := range iidx.m { - dst = encoding.MarshalBytes(dst, []byte(k)) - metricIDs = v.AppendTo(metricIDs[:0]) - dst = marshalMetricIDs(dst, metricIDs) - } - - // Marshal iidx.pendingEntries - dst = encoding.MarshalUint64(dst, uint64(len(iidx.pendingEntries))) - for _, e := range iidx.pendingEntries { - dst = encoding.MarshalUint32(dst, e.AccountID) - dst = encoding.MarshalUint32(dst, e.ProjectID) - dst = encoding.MarshalUint64(dst, e.MetricID) - } - - return dst -} - -func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) { - iidx.mu.Lock() - defer iidx.mu.Unlock() - - // Unmarshal iidx.m - if len(src) < 8 { - return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src)) - } - mLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - m := make(map[string]*uint64set.Set, mLen) - var metricIDs []uint64 - for i := 0; i < mLen; i++ { - tail, k, err := encoding.UnmarshalBytes(src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err) - } - src = tail - tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err) - } - src = tail - var v uint64set.Set - for _, metricID := range metricIDs { - v.Add(metricID) - } - m[string(k)] = &v - } - iidx.m = m - - // Unmarshal iidx.pendingEntries - if len(src) < 8 { - return src, fmt.Errorf("cannot unmarshal pendingEntriesLen from %d bytes; want at least %d bytes", len(src), 8) - } - pendingEntriesLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - if len(src) < pendingEntriesLen*16 { - return src, fmt.Errorf("cannot unmarshal %d pending entries from %d bytes; want at least %d bytes", pendingEntriesLen, len(src), pendingEntriesLen*16) - } - for i := 0; i < pendingEntriesLen; i++ { - var e pendingHourMetricIDEntry - e.AccountID = encoding.UnmarshalUint32(src) - src = src[4:] - e.ProjectID = encoding.UnmarshalUint32(src) - src = src[4:] - e.MetricID = encoding.UnmarshalUint64(src) - src = src[8:] - iidx.pendingEntries = append(iidx.pendingEntries, e) - } - - return src, nil -} - -func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { - dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) - for _, metricID := range metricIDs { - dst = encoding.MarshalUint64(dst, metricID) - } - return dst -} - -func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) { - if len(src) < 8 { - return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src)) - } - metricIDsLen := int(encoding.UnmarshalUint64(src)) - src = src[8:] - if len(src) < 8*metricIDsLen { - return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src)) - } - for i := 0; i < metricIDsLen; i++ { - metricID := encoding.UnmarshalUint64(src) - src = src[8:] - dst = append(dst, metricID) - } - return src, dst, nil -} - -func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 { - n := uint64(0) - iidx.mu.RLock() - for k, v := range iidx.m { - n += uint64(len(k)) - n += v.SizeBytes() - } - n += uint64(len(iidx.pendingEntries)) * uint64(unsafe.Sizeof(pendingHourMetricIDEntry{})) - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int { - if iidx == nil { - return 0 - } - iidx.mu.RLock() - n := len(iidx.m) - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetEntriesCount() int { - if iidx == nil { - return 0 - } - n := 0 - iidx.mu.RLock() - for _, v := range iidx.m { - n += v.Len() - } - iidx.mu.RUnlock() - return n -} - -func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int { - if iidx == nil { - return 0 - } - iidx.mu.RLock() - n := len(iidx.pendingEntries) - iidx.mu.RUnlock() - return n -} - -func newInmemoryInvertedIndex() *inmemoryInvertedIndex { - return &inmemoryInvertedIndex{ - m: make(map[string]*uint64set.Set), - } -} - -func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, byTenant map[accountProjectKey]*uint64set.Set) { - var entries []pendingHourMetricIDEntry - var metricIDs []uint64 - for k, v := range byTenant { - var e pendingHourMetricIDEntry - e.AccountID = k.AccountID - e.ProjectID = k.ProjectID - metricIDs = v.AppendTo(metricIDs[:0]) - for _, metricID := range metricIDs { - e.MetricID = metricID - entries = append(entries, e) - } - } - - iidx.mu.Lock() - iidx.pendingEntries = append(iidx.pendingEntries, entries...) - if err := iidx.addPendingEntriesLocked(idb); err != nil { - logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err) - } - iidx.mu.Unlock() -} - -func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, e pendingHourMetricIDEntry) { - iidx.mu.Lock() - iidx.pendingEntries = append(iidx.pendingEntries, e) - if err := iidx.addPendingEntriesLocked(idb); err != nil { - logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingEntries: %s", err) - } - iidx.mu.Unlock() -} - -func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) { - if iidx == nil { - return - } - var result *uint64set.Set - var tfFirst *tagFilter - for i := range tfs.tfs { - if tfs.tfs[i].isNegative { - continue - } - tfFirst = &tfs.tfs[i] - break - } - - iidx.mu.RLock() - defer iidx.mu.RUnlock() - - if tfFirst == nil { - result = allMetricIDs.Clone() - } else { - result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix) - result.Intersect(allMetricIDs) // This line is required for filtering metrics by (accountID, projectID) - } - for i := range tfs.tfs { - tf := &tfs.tfs[i] - if tf == tfFirst { - continue - } - m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix) - if tf.isNegative { - result.Subtract(m) - } else { - result.Intersect(m) - } - if result.Len() == 0 { - return - } - } - metricIDs.Union(result) -} - -func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set { - if !bytes.HasPrefix(tf.prefix, commonPrefix) { - logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix) - } - prefix := tf.prefix[len(commonPrefix):] - var m uint64set.Set - kb := kbPool.Get() - defer kbPool.Put(kb) - for k, v := range iidx.m { - if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) { - continue - } - kb.B = append(kb.B[:0], k[len(prefix):]...) - ok, err := tf.matchSuffix(kb.B) - if err != nil { - logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err) - } - if !ok { - continue - } - m.Union(v) - } - return &m -} - -func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error { - entries := iidx.pendingEntries - iidx.pendingEntries = iidx.pendingEntries[:0] - - kb := kbPool.Get() - defer kbPool.Put(kb) - - mn := GetMetricName() - defer PutMetricName(mn) - for _, e := range entries { - var err error - metricID := e.MetricID - kb.B, err = idb.searchMetricName(kb.B[:0], metricID, e.AccountID, e.ProjectID) - if err != nil { - if err == io.EOF { - iidx.pendingEntries = append(iidx.pendingEntries, e) - continue - } - return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err) - } - if err = mn.Unmarshal(kb.B); err != nil { - return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err) - } - kb.B = marshalTagValue(kb.B[:0], nil) - kb.B = marshalTagValue(kb.B, mn.MetricGroup) - iidx.addMetricIDLocked(kb.B, metricID) - for i := range mn.Tags { - kb.B = mn.Tags[i].Marshal(kb.B[:0]) - iidx.addMetricIDLocked(kb.B, metricID) - } - } - return nil -} - -func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) { - v := iidx.m[string(key)] - if v == nil { - v = &uint64set.Set{} - iidx.m[string(key)] = v - } - v.Add(metricID) -} diff --git a/lib/storage/inmemory_inverted_index_test.go b/lib/storage/inmemory_inverted_index_test.go deleted file mode 100644 index bbbd8190c8..0000000000 --- a/lib/storage/inmemory_inverted_index_test.go +++ /dev/null @@ -1,47 +0,0 @@ -package storage - -import ( - "fmt" - "reflect" - "testing" -) - -func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) { - iidx := newInmemoryInvertedIndex() - const keysCount = 100 - const metricIDsCount = 10000 - for i := 0; i < metricIDsCount; i++ { - k := fmt.Sprintf("key %d", i%keysCount) - iidx.addMetricIDLocked([]byte(k), uint64(i)) - } - for i := 0; i < 10; i++ { - var e pendingHourMetricIDEntry - e.AccountID = uint32(i) - e.ProjectID = uint32(i + 324) - e.MetricID = uint64(i * 43) - iidx.pendingEntries = append(iidx.pendingEntries, e) - } - - data := iidx.Marshal(nil) - - iidx2 := newInmemoryInvertedIndex() - tail, err := iidx2.Unmarshal(data) - if err != nil { - t.Fatalf("cannot unmarshal iidx: %s", err) - } - if len(tail) != 0 { - t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail)) - } - if len(iidx.m) != len(iidx2.m) { - t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m)) - } - if !reflect.DeepEqual(iidx.pendingEntries, iidx2.pendingEntries) { - t.Fatalf("unexpected pendingMetricIDs; got\n%v;\nwant\n%v", iidx2.pendingEntries, iidx.pendingEntries) - } - for k, v := range iidx.m { - v2 := iidx2.m[k] - if !v.Equal(v2) { - t.Fatalf("unexpected set for key %q", k) - } - } -} diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index d72612137f..a828b6157f 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -78,21 +78,15 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { func TestSearch(t *testing.T) { t.Run("global_inverted_index", func(t *testing.T) { - testSearchGeneric(t, false, false) + testSearchGeneric(t, false) }) t.Run("perday_inverted_index", func(t *testing.T) { - testSearchGeneric(t, false, true) - }) - t.Run("recent_hour_global_inverted_index", func(t *testing.T) { - testSearchGeneric(t, true, false) - }) - t.Run("recent_hour_perday_inverted_index", func(t *testing.T) { - testSearchGeneric(t, true, true) + testSearchGeneric(t, true) }) } -func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) { - path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex) +func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) { + path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex) st, err := OpenStorage(path, 0) if err != nil { t.Fatalf("cannot open storage %q: %s", path, err) @@ -154,10 +148,6 @@ func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayIn extDB.startDateForPerDayInvertedIndex = 0 }) } - if forceRecentHourInvertedIndex { - hm := st.currHourMetricIDs.Load().(*hourMetricIDs) - hm.isFull = true - } // Run search. tr := TimeRange{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 856fff88e3..b44a5c2e0d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -27,17 +27,6 @@ import ( const maxRetentionMonths = 12 * 100 -var disableRecentHourIndex = false - -// DisableRecentHourIndex disables in-memory inverted index for recent hour. -// -// This may be useful in order to save RAM for high cardinality data. -// -// This function must be called before OpenStorage. -func DisableRecentHourIndex() { - disableRecentHourIndex = true -} - // Storage represents TSDB storage. type Storage struct { // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. @@ -327,16 +316,12 @@ type Metrics struct { MetricNameCacheCollisions uint64 DateMetricIDCacheSize uint64 + DateMetricIDCacheSizeBytes uint64 DateMetricIDCacheSyncsCount uint64 DateMetricIDCacheResetsCount uint64 HourMetricIDCacheSize uint64 - RecentHourInvertedIndexSize uint64 - RecentHourInvertedIndexSizeBytes uint64 - RecentHourInvertedIndexUniqueTagPairsSize uint64 - RecentHourInvertedIndexPendingMetricIDsSize uint64 - IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -382,6 +367,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.MetricNameCacheCollisions += cs.Collisions m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) + m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes()) m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) @@ -393,18 +379,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) { } m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) - m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount()) - m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount()) - - m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes() - m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes() - - m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen()) - m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen()) - - m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen()) - m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen()) - s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } @@ -519,7 +493,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if !fs.IsPathExist(path) { logger.Infof("nothing to load from %q", path) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -531,7 +504,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if len(src) < 24 { logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -544,7 +516,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if hourLoaded != hour { logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -555,7 +526,6 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if uint64(len(src)) < 8*hmLen { logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -602,30 +572,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs byTenant[k] = m } - // Unmarshal hm.iidx - iidx := newInmemoryInvertedIndex() - if !disableRecentHourIndex { - tail, err := iidx.Unmarshal(src) - if err != nil { - logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, - } - } - if len(tail) > 0 { - logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) - return &hourMetricIDs{ - iidx: newInmemoryInvertedIndex(), - hour: hour, - } - } - } - logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) return &hourMetricIDs{ m: m, - iidx: iidx, byTenant: byTenant, hour: hourLoaded, isFull: isFull != 0, @@ -665,11 +614,6 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { } } - if !disableRecentHourIndex { - // Marshal hm.iidx - dst = hm.iidx.Marshal(dst) - } - if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } @@ -1030,9 +974,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { } s.pendingHourEntries = append(s.pendingHourEntries, e) s.pendingHourEntriesLock.Unlock() - if !disableRecentHourIndex { - hm.iidx.AddMetricID(idb, e) - } } // Slower path: check global cache for (date, metricID) entry. @@ -1098,6 +1039,15 @@ func (dmc *dateMetricIDCache) EntriesCount() int { return n } +func (dmc *dateMetricIDCache) SizeBytes() uint64 { + byDate := dmc.byDate.Load().(*byDateMetricIDMap) + n := uint64(0) + for _, e := range byDate.m { + n += e.v.SizeBytes() + } + return n +} + func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { byDate := dmc.byDate.Load().(*byDateMetricIDMap) v := byDate.get(date) @@ -1210,19 +1160,16 @@ func (s *Storage) updateCurrHourMetricIDs() { // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set - var iidx *inmemoryInvertedIndex var byTenant map[accountProjectKey]*uint64set.Set isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() - iidx = hm.iidx byTenant = make(map[accountProjectKey]*uint64set.Set, len(hm.byTenant)) for k, e := range hm.byTenant { byTenant[k] = e.Clone() } } else { m = &uint64set.Set{} - iidx = newInmemoryInvertedIndex() byTenant = make(map[accountProjectKey]*uint64set.Set) isFull = true } @@ -1243,7 +1190,6 @@ func (s *Storage) updateCurrHourMetricIDs() { hmNew := &hourMetricIDs{ m: m, - iidx: iidx, byTenant: byTenant, hour: hour, isFull: isFull, @@ -1256,7 +1202,6 @@ func (s *Storage) updateCurrHourMetricIDs() { type hourMetricIDs struct { m *uint64set.Set - iidx *inmemoryInvertedIndex byTenant map[accountProjectKey]*uint64set.Set hour uint64 isFull bool diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a1af4fa288..3d36482f03 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -107,7 +107,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -143,7 +142,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) @@ -204,7 +202,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -265,7 +262,6 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, - iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12)