From ca259864e28c5f9ed817cff15ba68c7828ef5b73 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 13 Nov 2019 13:11:02 +0200 Subject: [PATCH] lib/storage: return back inmemory inverted index for recent hour Issues fixed: - Slow startup times. Now the index is loaded from cache during start. - High memory usage related to superflouos index copies every 10 seconds. --- app/vmstorage/main.go | 19 ++ lib/storage/index_db.go | 44 +++ lib/storage/index_db_test.go | 4 + lib/storage/inmemory_inverted_index.go | 283 ++++++++++++++++++++ lib/storage/inmemory_inverted_index_test.go | 40 +++ lib/storage/search_test.go | 18 +- lib/storage/storage.go | 57 +++- lib/storage/storage_test.go | 4 + lib/uint64set/uint64set.go | 28 ++ lib/uint64set/uint64set_test.go | 9 + 10 files changed, 499 insertions(+), 7 deletions(-) create mode 100644 lib/storage/inmemory_inverted_index.go create mode 100644 lib/storage/inmemory_inverted_index_test.go diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 5109b23fe..10b0814a3 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -407,6 +407,25 @@ func registerStorageMetrics() { return float64(idbm().ItemsCount) }) + metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 { + return float64(m().RecentHourInvertedIndexSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_size_bytes`, func() float64 { + return float64(m().RecentHourInvertedIndexSizeBytes) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 { + return float64(m().RecentHourInvertedIndexUniqueTagPairsSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 { + return float64(m().RecentHourInvertedIndexPendingMetricIDsSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 { + return float64(idbm().RecentHourInvertedIndexSearchCalls) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 { + return float64(idbm().RecentHourInvertedIndexSearchHits) + }) + metrics.NewGauge(`vm_date_range_search_calls_total`, func() float64 { return float64(idbm().DateRangeSearchCalls) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 8192dfef6..5a9dd3068 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -95,6 +95,12 @@ type indexDB struct { // The number of successful searches for metric ids by days. dateMetricIDsSearchHits uint64 + // The number of calls for recent hour searches over inverted index. + recentHourInvertedIndexSearchCalls uint64 + + // The number of hits for recent hour searches over inverted index. + recentHourInvertedIndexSearchHits uint64 + // The number of calls for date range searches. dateRangeSearchCalls uint64 @@ -225,6 +231,9 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 + RecentHourInvertedIndexSearchCalls uint64 + RecentHourInvertedIndexSearchHits uint64 + DateRangeSearchCalls uint64 DateRangeSearchHits uint64 @@ -266,6 +275,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) + m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls) + m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits) + m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) @@ -1655,6 +1667,10 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) + if is.tryUpdatingMetricIDsForRecentHour(metricIDs, tfs, tr) { + // Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour. + return nil + } ok, err := is.tryUpdatingMetricIDsForDateRange(metricIDs, tfs, tr, maxMetrics) if err != nil { return err @@ -2177,6 +2193,34 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) return nil, false } +func (is *indexSearch) tryUpdatingMetricIDsForRecentHour(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { + minHour := uint64(tr.MinTimestamp) / msecPerHour + maxHour := uint64(tr.MaxTimestamp) / msecPerHour + + hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { + // The tr fits the current hour. + hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs) + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) + return true + } + hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { + // The tr fits the previous hour. + hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs) + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) + return true + } + if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { + // The tr spans the previous and the current hours. + hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs) + hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs) + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) + return true + } + return false +} + func (db *indexDB) storeDateMetricID(date, metricID uint64) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID) diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index c76a117c2..0b81e4809 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1418,6 +1418,10 @@ func TestSearchTSIDWithTimeRange(t *testing.T) { prevMetricIDs.m.Add(tsids[i].MetricID) currMetricIDs.m.Add(tsids[i].MetricID) } + prevMetricIDs.iidx = newInmemoryInvertedIndex() + prevMetricIDs.iidx.MustUpdate(db, prevMetricIDs.m) + currMetricIDs.iidx = newInmemoryInvertedIndex() + currMetricIDs.iidx.MustUpdate(db, currMetricIDs.m) } } diff --git a/lib/storage/inmemory_inverted_index.go b/lib/storage/inmemory_inverted_index.go new file mode 100644 index 000000000..f503ad071 --- /dev/null +++ b/lib/storage/inmemory_inverted_index.go @@ -0,0 +1,283 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" +) + +type inmemoryInvertedIndex struct { + mu sync.RWMutex + m map[string]*uint64set.Set + pendingMetricIDs []uint64 +} + +func (iidx *inmemoryInvertedIndex) Marshal(dst []byte) []byte { + iidx.mu.RLock() + defer iidx.mu.RUnlock() + + // Marshal iidx.m + var metricIDs []uint64 + dst = encoding.MarshalUint64(dst, uint64(len(iidx.m))) + for k, v := range iidx.m { + dst = encoding.MarshalBytes(dst, []byte(k)) + metricIDs = v.AppendTo(metricIDs[:0]) + dst = marshalMetricIDs(dst, metricIDs) + } + + // Marshal iidx.pendingMetricIDs + dst = marshalMetricIDs(dst, iidx.pendingMetricIDs) + + return dst +} + +func (iidx *inmemoryInvertedIndex) Unmarshal(src []byte) ([]byte, error) { + iidx.mu.Lock() + defer iidx.mu.Unlock() + + // Unmarshal iidx.m + if len(src) < 8 { + return src, fmt.Errorf("cannot read len(iidx.m) from %d bytes; want at least 8 bytes", len(src)) + } + mLen := int(encoding.UnmarshalUint64(src)) + src = src[8:] + m := make(map[string]*uint64set.Set, mLen) + var metricIDs []uint64 + for i := 0; i < mLen; i++ { + tail, k, err := encoding.UnmarshalBytes(src) + if err != nil { + return tail, fmt.Errorf("cannot unmarshal key #%d for iidx.m: %s", i, err) + } + src = tail + tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src) + if err != nil { + return tail, fmt.Errorf("cannot unmarshal value #%d for iidx.m: %s", i, err) + } + src = tail + var v uint64set.Set + for _, metricID := range metricIDs { + v.Add(metricID) + } + m[string(k)] = &v + } + iidx.m = m + + // Unmarshal iidx.pendingMetricIDs + var err error + var tail []byte + tail, metricIDs, err = unmarshalMetricIDs(metricIDs[:0], src) + if err != nil { + return tail, fmt.Errorf("cannot unmarshal iidx.pendingMetricIDs: %s", err) + } + src = tail + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs[:0], metricIDs...) + + return src, nil +} + +func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { + dst = encoding.MarshalUint64(dst, uint64(len(metricIDs))) + for _, metricID := range metricIDs { + dst = encoding.MarshalUint64(dst, metricID) + } + return dst +} + +func unmarshalMetricIDs(dst []uint64, src []byte) ([]byte, []uint64, error) { + if len(src) < 8 { + return src, dst, fmt.Errorf("cannot unmarshal metricIDs len from %d bytes; want at least 8 bytes", len(src)) + } + metricIDsLen := int(encoding.UnmarshalUint64(src)) + src = src[8:] + if len(src) < 8*metricIDsLen { + return src, dst, fmt.Errorf("not enough bytes for unmarshaling %d metricIDs; want %d bytes; got %d bytes", metricIDsLen, 8*metricIDsLen, len(src)) + } + for i := 0; i < metricIDsLen; i++ { + metricID := encoding.UnmarshalUint64(src) + src = src[8:] + dst = append(dst, metricID) + } + return src, dst, nil +} + +func (iidx *inmemoryInvertedIndex) SizeBytes() uint64 { + n := uint64(0) + iidx.mu.RLock() + for k, v := range iidx.m { + n += uint64(len(k)) + n += v.SizeBytes() + } + n += uint64(len(iidx.pendingMetricIDs)) * 8 + iidx.mu.RUnlock() + return n +} + +func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int { + if iidx == nil { + return 0 + } + iidx.mu.RLock() + n := len(iidx.m) + iidx.mu.RUnlock() + return n +} + +func (iidx *inmemoryInvertedIndex) GetEntriesCount() int { + if iidx == nil { + return 0 + } + n := 0 + iidx.mu.RLock() + for _, v := range iidx.m { + n += v.Len() + } + iidx.mu.RUnlock() + return n +} + +func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int { + if iidx == nil { + return 0 + } + iidx.mu.RLock() + n := len(iidx.pendingMetricIDs) + iidx.mu.RUnlock() + return n +} + +func newInmemoryInvertedIndex() *inmemoryInvertedIndex { + return &inmemoryInvertedIndex{ + m: make(map[string]*uint64set.Set), + } +} + +func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, src *uint64set.Set) { + metricIDs := src.AppendTo(nil) + iidx.mu.Lock() + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricIDs...) + if err := iidx.addPendingEntriesLocked(idb); err != nil { + logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err) + } + iidx.mu.Unlock() +} + +func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, metricID uint64) { + iidx.mu.Lock() + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID) + if err := iidx.addPendingEntriesLocked(idb); err != nil { + logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err) + } + iidx.mu.Unlock() +} + +func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) { + if iidx == nil { + return + } + var result *uint64set.Set + var tfFirst *tagFilter + for i := range tfs.tfs { + if tfs.tfs[i].isNegative { + continue + } + tfFirst = &tfs.tfs[i] + break + } + + iidx.mu.RLock() + defer iidx.mu.RUnlock() + + if tfFirst == nil { + result = allMetricIDs.Clone() + } else { + result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix) + } + for i := range tfs.tfs { + tf := &tfs.tfs[i] + if tf == tfFirst { + continue + } + m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix) + if tf.isNegative { + result.Subtract(m) + } else { + result.Intersect(m) + } + if result.Len() == 0 { + return + } + } + metricIDs.Union(result) +} + +func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set { + if !bytes.HasPrefix(tf.prefix, commonPrefix) { + logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix) + } + prefix := tf.prefix[len(commonPrefix):] + var m uint64set.Set + kb := kbPool.Get() + defer kbPool.Put(kb) + for k, v := range iidx.m { + if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) { + continue + } + kb.B = append(kb.B[:0], k[len(prefix):]...) + ok, err := tf.matchSuffix(kb.B) + if err != nil { + logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err) + } + if !ok { + continue + } + m.Union(v) + } + return &m +} + +func (iidx *inmemoryInvertedIndex) addPendingEntriesLocked(idb *indexDB) error { + metricIDs := iidx.pendingMetricIDs + iidx.pendingMetricIDs = iidx.pendingMetricIDs[:0] + + kb := kbPool.Get() + defer kbPool.Put(kb) + + mn := GetMetricName() + defer PutMetricName(mn) + for _, metricID := range metricIDs { + var err error + kb.B, err = idb.searchMetricName(kb.B[:0], metricID) + if err != nil { + if err == io.EOF { + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID) + continue + } + return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err) + } + if err = mn.Unmarshal(kb.B); err != nil { + return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err) + } + kb.B = marshalTagValue(kb.B[:0], nil) + kb.B = marshalTagValue(kb.B, mn.MetricGroup) + iidx.addMetricIDLocked(kb.B, metricID) + for i := range mn.Tags { + kb.B = mn.Tags[i].Marshal(kb.B[:0]) + iidx.addMetricIDLocked(kb.B, metricID) + } + } + return nil +} + +func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) { + v := iidx.m[string(key)] + if v == nil { + v = &uint64set.Set{} + iidx.m[string(key)] = v + } + v.Add(metricID) +} diff --git a/lib/storage/inmemory_inverted_index_test.go b/lib/storage/inmemory_inverted_index_test.go new file mode 100644 index 000000000..3110b4a1e --- /dev/null +++ b/lib/storage/inmemory_inverted_index_test.go @@ -0,0 +1,40 @@ +package storage + +import ( + "fmt" + "reflect" + "testing" +) + +func TestInmemoryInvertedIndexMarshalUnmarshal(t *testing.T) { + iidx := newInmemoryInvertedIndex() + const keysCount = 100 + const metricIDsCount = 10000 + for i := 0; i < metricIDsCount; i++ { + k := fmt.Sprintf("key %d", i%keysCount) + iidx.addMetricIDLocked([]byte(k), uint64(i)) + } + + data := iidx.Marshal(nil) + + iidx2 := newInmemoryInvertedIndex() + tail, err := iidx2.Unmarshal(data) + if err != nil { + t.Fatalf("cannot unmarshal iidx: %s", err) + } + if len(tail) != 0 { + t.Fatalf("unexpected tail left after iidx unmarshaling: %d bytes", len(tail)) + } + if len(iidx.m) != len(iidx2.m) { + t.Fatalf("unexpected len(iidx2.m); got %d; want %d", len(iidx2.m), len(iidx.m)) + } + if !reflect.DeepEqual(iidx.pendingMetricIDs, iidx2.pendingMetricIDs) { + t.Fatalf("unexpected pendingMetricIDs; got\n%d;\nwant\n%d", iidx2.pendingMetricIDs, iidx.pendingMetricIDs) + } + for k, v := range iidx.m { + v2 := iidx2.m[k] + if !v.Equal(v2) { + t.Fatalf("unexpected set for key %q", k) + } + } +} diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 6500907cc..a437b74d2 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -72,15 +72,21 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) { func TestSearch(t *testing.T) { t.Run("global_inverted_index", func(t *testing.T) { - testSearchGeneric(t, false) + testSearchGeneric(t, false, false) }) t.Run("perday_inverted_index", func(t *testing.T) { - testSearchGeneric(t, true) + testSearchGeneric(t, false, true) + }) + t.Run("recent_hour_global_inverted_index", func(t *testing.T) { + testSearchGeneric(t, true, false) + }) + t.Run("recent_hour_perday_inverted_index", func(t *testing.T) { + testSearchGeneric(t, true, true) }) } -func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) { - path := fmt.Sprintf("TestSearch_%v", forcePerDayInvertedIndex) +func testSearchGeneric(t *testing.T, forceRecentHourInvertedIndex, forcePerDayInvertedIndex bool) { + path := fmt.Sprintf("TestSearch_%v_%v", forceRecentHourInvertedIndex, forcePerDayInvertedIndex) st, err := OpenStorage(path, 0) if err != nil { t.Fatalf("cannot open storage %q: %s", path, err) @@ -141,6 +147,10 @@ func testSearchGeneric(t *testing.T, forcePerDayInvertedIndex bool) { extDB.startDateForPerDayInvertedIndex = 0 }) } + if forceRecentHourInvertedIndex { + hm := st.currHourMetricIDs.Load().(*hourMetricIDs) + hm.isFull = true + } // Run search. tr := TimeRange{ diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 8963c9884..c21f8ac69 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -311,6 +311,11 @@ type Metrics struct { HourMetricIDCacheSize uint64 + RecentHourInvertedIndexSize uint64 + RecentHourInvertedIndexSizeBytes uint64 + RecentHourInvertedIndexUniqueTagPairsSize uint64 + RecentHourInvertedIndexPendingMetricIDsSize uint64 + IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -367,6 +372,18 @@ func (s *Storage) UpdateMetrics(m *Metrics) { } m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) + m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount()) + m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount()) + + m.RecentHourInvertedIndexSizeBytes += hmPrev.iidx.SizeBytes() + m.RecentHourInvertedIndexSizeBytes += hmCurr.iidx.SizeBytes() + + m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen()) + m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen()) + + m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen()) + m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen()) + s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } @@ -481,6 +498,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if !fs.IsPathExist(path) { logger.Infof("nothing to load from %q", path) return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -492,6 +510,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if len(src) < 24 { logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -504,6 +523,7 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs if hourLoaded != hour { logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -511,9 +531,10 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs // Unmarshal hm.m hmLen := encoding.UnmarshalUint64(src) src = src[8:] - if uint64(len(src)) != 8*hmLen { - logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want %d bytes", path, len(src), 8*hmLen) + if uint64(len(src)) < 8*hmLen { + logger.Errorf("discarding %s, since it has broken hm.m data; got %d bytes; want at least %d bytes", path, len(src), 8*hmLen) return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), hour: hour, } } @@ -523,9 +544,29 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs src = src[8:] m.Add(metricID) } + + // Unmarshal hm.iidx + iidx := newInmemoryInvertedIndex() + tail, err := iidx.Unmarshal(src) + if err != nil { + logger.Errorf("discarding %s, since it has broken hm.iidx data: %s", path, err) + return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), + hour: hour, + } + } + if len(tail) > 0 { + logger.Errorf("discarding %s, since it contains superflouos %d bytes of data", path, len(tail)) + return &hourMetricIDs{ + iidx: newInmemoryInvertedIndex(), + hour: hour, + } + } + logger.Infof("loaded %s from %q in %s; entriesCount: %d; sizeBytes: %d", name, path, time.Since(startTime), hmLen, srcOrigLen) return &hourMetricIDs{ m: m, + iidx: iidx, hour: hourLoaded, isFull: isFull != 0, } @@ -550,6 +591,10 @@ func (s *Storage) mustSaveHourMetricIDs(hm *hourMetricIDs, name string) { for _, metricID := range hm.m.AppendTo(nil) { dst = encoding.MarshalUint64(dst, metricID) } + + // Marshal hm.iidx + dst = hm.iidx.Marshal(dst) + if err := ioutil.WriteFile(path, dst, 0644); err != nil { logger.Panicf("FATAL: cannot write %d bytes to %q: %s", len(dst), path, err) } @@ -900,6 +945,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { s.pendingHourEntriesLock.Lock() s.pendingHourEntries.Add(metricID) s.pendingHourEntriesLock.Unlock() + hm.iidx.AddMetricID(idb, metricID) } // Slower path: check global cache for (date, metricID) entry. @@ -908,7 +954,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, lastError error) error { continue } - // Slow path: store the entry (date, metricID) entry in the indexDB. + // Slow path: store the (date, metricID) entry in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. if err := idb.storeDateMetricID(date, metricID); err != nil { @@ -1077,16 +1123,20 @@ func (s *Storage) updateCurrHourMetricIDs() { // Slow path: hm.m must be updated with non-empty s.pendingHourEntries. var m *uint64set.Set + var iidx *inmemoryInvertedIndex isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() + iidx = hm.iidx } else { m = &uint64set.Set{} + iidx = newInmemoryInvertedIndex() isFull = true } m.Union(newMetricIDs) hmNew := &hourMetricIDs{ m: m, + iidx: iidx, hour: hour, isFull: isFull, } @@ -1098,6 +1148,7 @@ func (s *Storage) updateCurrHourMetricIDs() { type hourMetricIDs struct { m *uint64set.Set + iidx *inmemoryInvertedIndex hour uint64 isFull bool } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index cf408170e..93c1d5e0a 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -108,6 +108,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -143,6 +144,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) @@ -187,6 +189,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -228,6 +231,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 0bbea5f45..7b939b92e 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -3,6 +3,7 @@ package uint64set import ( "math/bits" "sort" + "unsafe" ) // Set is a fast set for uint64. @@ -47,6 +48,19 @@ func (s *Set) Clone() *Set { return &dst } +// SizeBytes returns an estimate size of s in RAM. +func (s *Set) SizeBytes() uint64 { + if s == nil { + return 0 + } + n := uint64(unsafe.Sizeof(*s)) + for _, b := range s.buckets { + n += uint64(unsafe.Sizeof(b)) + n += b.sizeBytes() + } + return n +} + // Len returns the number of distinct uint64 values in s. func (s *Set) Len() int { if s == nil { @@ -259,6 +273,16 @@ type bucket32 struct { smallPool [14]uint32 } +func (b *bucket32) sizeBytes() uint64 { + n := uint64(unsafe.Sizeof(*b)) + n += 2 * uint64(len(b.b16his)) + for _, b := range b.buckets { + n += uint64(unsafe.Sizeof(b)) + n += b.sizeBytes() + } + return n +} + func (b *bucket32) clone() *bucket32 { var dst bucket32 dst.skipSmallPool = b.skipSmallPool @@ -463,6 +487,10 @@ type bucket16 struct { bits [wordsPerBucket]uint64 } +func (b *bucket16) sizeBytes() uint64 { + return uint64(unsafe.Sizeof(*b)) +} + func (b *bucket16) clone() *bucket16 { var dst bucket16 copy(dst.bits[:], b.bits[:]) diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index 4ec7d46df..b0487653d 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -25,6 +25,9 @@ func testSetBasicOps(t *testing.T, itemsCount int) { // Verify operations on nil set { var sNil *Set + if n := sNil.SizeBytes(); n != 0 { + t.Fatalf("sNil.SizeBytes must return 0; got %d", n) + } if sNil.Has(123) { t.Fatalf("sNil shouldn't contain any item; found 123") } @@ -73,6 +76,9 @@ func testSetBasicOps(t *testing.T, itemsCount int) { if n := s.Len(); n != itemsCount/2 { t.Fatalf("unexpected s.Len() after forward Add; got %d; want %d", n, itemsCount/2) } + if n := s.SizeBytes(); n == 0 { + t.Fatalf("s.SizeBytes() must be greater than 0") + } // Verify backward Add for i := 0; i < itemsCount/2; i++ { @@ -307,6 +313,9 @@ func testSetSparseItems(t *testing.T, itemsCount int) { if n := s.Len(); n != len(m) { t.Fatalf("unexpected Len(); got %d; want %d", n, len(m)) } + if n := s.SizeBytes(); n == 0 { + t.Fatalf("SizeBytes() must return value greater than 0") + } // Check Has for x := range m {