From d888b2165790823cfae34e52e398ae5bc814d4ee Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 8 Nov 2019 13:16:40 +0200 Subject: [PATCH] lib/storage: add inmemory inverted index for the last hour It should improve performance for `last N hours` dashboards with update intervals smaller than 1 hour. --- app/vmstorage/main.go | 16 ++ lib/storage/index_db.go | 49 +++++- lib/storage/inmemory_inverted_index.go | 197 ++++++++++++++++++++++++ lib/storage/search_test.go | 2 +- lib/storage/storage.go | 60 ++++++-- lib/storage/storage_test.go | 10 +- lib/uint64set/uint64set.go | 203 ++++++++++++++++++++++++- lib/uint64set/uint64set_test.go | 174 ++++++++++++++++++--- 8 files changed, 662 insertions(+), 49 deletions(-) create mode 100644 lib/storage/inmemory_inverted_index.go diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index e462f3cd3f..b6c8fab39c 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -404,6 +404,22 @@ func registerStorageMetrics() { return float64(idbm().ItemsCount) }) + metrics.NewGauge(`vm_recent_hour_inverted_index_entries`, func() float64 { + return float64(m().RecentHourInvertedIndexSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_unique_tag_pairs`, func() float64 { + return float64(m().RecentHourInvertedIndexUniqueTagPairsSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_pending_metric_ids`, func() float64 { + return float64(m().RecentHourInvertedIndexPendingMetricIDsSize) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_search_calls_total`, func() float64 { + return float64(idbm().RecentHourInvertedIndexSearchCalls) + }) + metrics.NewGauge(`vm_recent_hour_inverted_index_search_hits_total`, func() float64 { + return float64(idbm().RecentHourInvertedIndexSearchHits) + }) + metrics.NewGauge(`vm_cache_entries{type="storage/tsid"}`, func() float64 { return float64(m().TSIDCacheSize) }) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 40f9d9b633..ba76026d27 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -89,6 +89,12 @@ type indexDB struct { // The number of successful searches for metric ids by days. dateMetricIDsSearchHits uint64 + // The number of calls for recent hour serches over inverted index. + recentHourInvertedIndexSearchCalls uint64 + + // The number of hits for recent hour searches over inverted index. + recentHourInvertedIndexSearchHits uint64 + mustDrop uint64 name string @@ -201,6 +207,9 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 + RecentHourInvertedIndexSearchCalls uint64 + RecentHourInvertedIndexSearchHits uint64 + IndexBlocksWithMetricIDsProcessed uint64 IndexBlocksWithMetricIDsIncorrectOrder uint64 @@ -226,7 +235,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { db.uselessTagFiltersCache.UpdateStats(&cs) m.UselessTagFiltersCacheSize += cs.EntriesCount m.UselessTagFiltersCacheSizeBytes += cs.BytesSize - m.UselessTagFiltersCacheRequests += cs.GetBigCalls + m.UselessTagFiltersCacheRequests += cs.GetCalls m.UselessTagFiltersCacheMisses += cs.Misses m.DeletedMetricsCount += uint64(db.getDeletedMetricIDs().Len()) @@ -238,6 +247,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) + m.RecentHourInvertedIndexSearchCalls += atomic.LoadUint64(&db.recentHourInvertedIndexSearchCalls) + m.RecentHourInvertedIndexSearchHits += atomic.LoadUint64(&db.recentHourInvertedIndexSearchHits) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) @@ -395,7 +407,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione if versioned { prefix = atomic.LoadUint64(&tagFiltersKeyGen) } - const cacheGranularityMs = 1000 * 60 * 5 + const cacheGranularityMs = 1000 * 10 startTime := (uint64(tr.MinTimestamp) / cacheGranularityMs) * cacheGranularityMs endTime := (uint64(tr.MaxTimestamp) / cacheGranularityMs) * cacheGranularityMs dst = encoding.MarshalUint64(dst, prefix) @@ -548,6 +560,7 @@ func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error { if err := db.generateTSID(dst, metricName, mn); err != nil { return fmt.Errorf("cannot generate TSID: %s", err) } + db.putMetricNameToCache(dst.MetricID, metricName) if err := db.createIndexes(dst, mn); err != nil { return fmt.Errorf("cannot create indexes: %s", err) } @@ -1572,6 +1585,13 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf return bytes.Compare(a.prefix, b.prefix) < 0 }) + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchCalls, 1) + if is.tryUpdatingMetricIDsForLastHourTimeRange(metricIDs, tfs, tr) { + // Fast path: found metricIDs in the inmemoryInvertedIndex for the last hour. + atomic.AddUint64(&is.db.recentHourInvertedIndexSearchHits, 1) + return nil + } + minTf, minMetricIDs, err := is.getTagFilterWithMinMetricIDsCountOptimized(tfs, tr, maxMetrics) if err != nil { return err @@ -1943,6 +1963,31 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) return nil, false } +func (is *indexSearch) tryUpdatingMetricIDsForLastHourTimeRange(metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange) bool { + minHour := uint64(tr.MinTimestamp) / msecPerHour + maxHour := uint64(tr.MaxTimestamp) / msecPerHour + + hmCurr := is.db.currHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmCurr.hour && minHour == maxHour && hmCurr.isFull { + // The tr fits the current hour. + hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs) + return true + } + hmPrev := is.db.prevHourMetricIDs.Load().(*hourMetricIDs) + if maxHour == hmPrev.hour && minHour == maxHour && hmPrev.isFull { + // The tr fits the previous hour. + hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs) + return true + } + if maxHour == hmCurr.hour && minHour == hmPrev.hour && hmCurr.isFull && hmPrev.isFull { + // The tr spans the previous and the current hours. + hmPrev.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmPrev.m, tfs) + hmCurr.iidx.UpdateMetricIDsForTagFilters(metricIDs, hmCurr.m, tfs) + return true + } + return false +} + func (db *indexDB) storeDateMetricID(date, metricID uint64) error { is := db.getIndexSearch() ok, err := is.hasDateMetricID(date, metricID) diff --git a/lib/storage/inmemory_inverted_index.go b/lib/storage/inmemory_inverted_index.go new file mode 100644 index 0000000000..487c226ee7 --- /dev/null +++ b/lib/storage/inmemory_inverted_index.go @@ -0,0 +1,197 @@ +package storage + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" +) + +type inmemoryInvertedIndex struct { + mu sync.RWMutex + m map[string]*uint64set.Set + pendingMetricIDs []uint64 +} + +func (iidx *inmemoryInvertedIndex) GetUniqueTagPairsLen() int { + if iidx == nil { + return 0 + } + iidx.mu.RLock() + n := len(iidx.m) + iidx.mu.RUnlock() + return n +} + +func (iidx *inmemoryInvertedIndex) GetEntriesCount() int { + if iidx == nil { + return 0 + } + n := 0 + iidx.mu.RLock() + for _, v := range iidx.m { + n += v.Len() + } + iidx.mu.RUnlock() + return n +} + +func (iidx *inmemoryInvertedIndex) GetPendingMetricIDsLen() int { + if iidx == nil { + return 0 + } + iidx.mu.RLock() + n := len(iidx.pendingMetricIDs) + iidx.mu.RUnlock() + return n +} + +func newInmemoryInvertedIndex() *inmemoryInvertedIndex { + return &inmemoryInvertedIndex{ + m: make(map[string]*uint64set.Set), + } +} + +func (iidx *inmemoryInvertedIndex) Clone() *inmemoryInvertedIndex { + if iidx == nil { + return newInmemoryInvertedIndex() + } + iidx.mu.RLock() + mCopy := make(map[string]*uint64set.Set, len(iidx.m)) + for k, v := range iidx.m { + mCopy[k] = v.Clone() + } + pendingMetricIDs := append([]uint64{}, iidx.pendingMetricIDs...) + iidx.mu.RUnlock() + return &inmemoryInvertedIndex{ + m: mCopy, + pendingMetricIDs: pendingMetricIDs, + } +} + +func (iidx *inmemoryInvertedIndex) MustUpdate(idb *indexDB, src *uint64set.Set) { + metricIDs := src.AppendTo(nil) + iidx.mu.Lock() + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricIDs...) + if err := iidx.updateLocked(idb); err != nil { + logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err) + } + iidx.mu.Unlock() +} + +func (iidx *inmemoryInvertedIndex) AddMetricID(idb *indexDB, metricID uint64) { + iidx.mu.Lock() + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID) + if err := iidx.updateLocked(idb); err != nil { + logger.Panicf("FATAL: cannot update inmemoryInvertedIndex with pendingMetricIDs: %s", err) + } + iidx.mu.Unlock() +} + +func (iidx *inmemoryInvertedIndex) UpdateMetricIDsForTagFilters(metricIDs, allMetricIDs *uint64set.Set, tfs *TagFilters) { + if iidx == nil { + return + } + var result *uint64set.Set + var tfFirst *tagFilter + for i := range tfs.tfs { + if tfs.tfs[i].isNegative { + continue + } + tfFirst = &tfs.tfs[i] + } + + iidx.mu.RLock() + defer iidx.mu.RUnlock() + + if tfFirst == nil { + result = allMetricIDs.Clone() + } else { + result = iidx.getMetricIDsForTagFilterLocked(tfFirst, tfs.commonPrefix) + } + for i := range tfs.tfs { + tf := &tfs.tfs[i] + if tf == tfFirst { + continue + } + m := iidx.getMetricIDsForTagFilterLocked(tf, tfs.commonPrefix) + if tf.isNegative { + result.Subtract(m) + } else { + result.Intersect(m) + } + if result.Len() == 0 { + return + } + } + metricIDs.Union(result) +} + +func (iidx *inmemoryInvertedIndex) getMetricIDsForTagFilterLocked(tf *tagFilter, commonPrefix []byte) *uint64set.Set { + if !bytes.HasPrefix(tf.prefix, commonPrefix) { + logger.Panicf("BUG: tf.prefix must start with commonPrefix=%q; got %q", commonPrefix, tf.prefix) + } + prefix := tf.prefix[len(commonPrefix):] + var m uint64set.Set + kb := kbPool.Get() + defer kbPool.Put(kb) + for k, v := range iidx.m { + if len(k) < len(prefix) || k[:len(prefix)] != string(prefix) { + continue + } + kb.B = append(kb.B[:0], k[len(prefix):]...) + ok, err := tf.matchSuffix(kb.B) + if err != nil { + logger.Panicf("BUG: unexpected error from matchSuffix(%q): %s", kb.B, err) + } + if !ok { + continue + } + m.Union(v) + } + return &m +} + +func (iidx *inmemoryInvertedIndex) updateLocked(idb *indexDB) error { + metricIDs := iidx.pendingMetricIDs + iidx.pendingMetricIDs = iidx.pendingMetricIDs[:0] + + kb := kbPool.Get() + defer kbPool.Put(kb) + + var mn MetricName + for _, metricID := range metricIDs { + var err error + kb.B, err = idb.searchMetricName(kb.B[:0], metricID) + if err != nil { + if err == io.EOF { + iidx.pendingMetricIDs = append(iidx.pendingMetricIDs, metricID) + continue + } + return fmt.Errorf("cannot find metricName by metricID %d: %s", metricID, err) + } + if err = mn.Unmarshal(kb.B); err != nil { + return fmt.Errorf("cannot unmarshal metricName %q: %s", kb.B, err) + } + kb.B = marshalTagValue(kb.B[:0], nil) + kb.B = marshalTagValue(kb.B, mn.MetricGroup) + iidx.addMetricIDLocked(kb.B, metricID) + for i := range mn.Tags { + kb.B = mn.Tags[i].Marshal(kb.B[:0]) + iidx.addMetricIDLocked(kb.B, metricID) + } + } + return nil +} + +func (iidx *inmemoryInvertedIndex) addMetricIDLocked(key []byte, metricID uint64) { + v := iidx.m[string(key)] + if v == nil { + v = &uint64set.Set{} + iidx.m[string(key)] = v + } + v.Add(metricID) +} diff --git a/lib/storage/search_test.go b/lib/storage/search_test.go index 8e1264c17a..014ba635da 100644 --- a/lib/storage/search_test.go +++ b/lib/storage/search_test.go @@ -96,7 +96,7 @@ func TestSearch(t *testing.T) { {[]byte("instance"), []byte("8.8.8.8:1234")}, } startTimestamp := timestampFromTime(time.Now()) - startTimestamp -= startTimestamp % (1e3 * 3600 * 24) + startTimestamp -= startTimestamp % (1e3 * 60 * 30) blockRowsCount := 0 for i := 0; i < rowsCount; i++ { mn.MetricGroup = []byte(fmt.Sprintf("metric_%d", i%metricGroupsCount)) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a446f7f0aa..3228c0a84e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -140,6 +140,11 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { idbCurr.SetExtDB(idbPrev) s.idbCurr.Store(idbCurr) + // Initialize iidx. hmCurr shouldn't be used till now, + // so it should be safe initializing it inplace. + hmCurr.iidx = newInmemoryInvertedIndex() + hmCurr.iidx.MustUpdate(s.idb(), hmCurr.m) + // Load data tablePath := path + "/data" tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs) @@ -313,6 +318,10 @@ type Metrics struct { HourMetricIDCacheSize uint64 + RecentHourInvertedIndexSize uint64 + RecentHourInvertedIndexUniqueTagPairsSize uint64 + RecentHourInvertedIndexPendingMetricIDsSize uint64 + IndexDBMetrics IndexDBMetrics TableMetrics TableMetrics } @@ -373,6 +382,15 @@ func (s *Storage) UpdateMetrics(m *Metrics) { } m.HourMetricIDCacheSize += uint64(hourMetricIDsLen) + m.RecentHourInvertedIndexSize += uint64(hmPrev.iidx.GetEntriesCount()) + m.RecentHourInvertedIndexSize += uint64(hmCurr.iidx.GetEntriesCount()) + + m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmPrev.iidx.GetUniqueTagPairsLen()) + m.RecentHourInvertedIndexUniqueTagPairsSize += uint64(hmCurr.iidx.GetUniqueTagPairsLen()) + + m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmPrev.iidx.GetPendingMetricIDsLen()) + m.RecentHourInvertedIndexPendingMetricIDsSize += uint64(hmCurr.iidx.GetPendingMetricIDsLen()) + s.idb().UpdateMetrics(&m.IndexDBMetrics) s.tb.UpdateMetrics(&m.TableMetrics) } @@ -412,6 +430,7 @@ func (s *Storage) currHourMetricIDsUpdater() { for { select { case <-s.stop: + s.updateCurrHourMetricIDs() return case <-t.C: s.updateCurrHourMetricIDs() @@ -486,7 +505,9 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs startTime := time.Now() if !fs.IsPathExist(path) { logger.Infof("nothing to load from %q", path) - return &hourMetricIDs{} + return &hourMetricIDs{ + hour: hour, + } } src, err := ioutil.ReadFile(path) if err != nil { @@ -495,21 +516,27 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs srcOrigLen := len(src) if len(src) < 24 { logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24) - return &hourMetricIDs{} + return &hourMetricIDs{ + hour: hour, + } } isFull := encoding.UnmarshalUint64(src) src = src[8:] hourLoaded := encoding.UnmarshalUint64(src) src = src[8:] if hourLoaded != hour { - logger.Infof("discarding %s, since it is outdated", name) - return &hourMetricIDs{} + logger.Infof("discarding %s, since it contains outdated hour; got %d; want %d", name, hourLoaded, hour) + return &hourMetricIDs{ + hour: hour, + } } hmLen := encoding.UnmarshalUint64(src) src = src[8:] if uint64(len(src)) != 8*hmLen { logger.Errorf("discarding %s, since it has broken body; got %d bytes; want %d bytes", path, len(src), 8*hmLen) - return &hourMetricIDs{} + return &hourMetricIDs{ + hour: hour, + } } m := &uint64set.Set{} for i := uint64(0); i < hmLen; i++ { @@ -772,9 +799,6 @@ var ( ) func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { - // Return only the last error, since it has no sense in returning all errors. - var lastWarn error - var is *indexSearch var mn *MetricName var kb *bytesutil.ByteBuffer @@ -788,6 +812,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra rows = rows[:rowsLen+len(mrs)] j := 0 minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() + // Return only the last error, since it has no sense in returning all errors. + var lastWarn error for i := range mrs { mr := &mrs[i] if math.IsNaN(mr.Value) { @@ -878,6 +904,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error keyBuf := kb.B a := (*[2]uint64)(unsafe.Pointer(&keyBuf[0])) idb := s.idb() + hm := s.currHourMetricIDs.Load().(*hourMetricIDs) for i := range rows { r := &rows[i] if r.Timestamp != prevTimestamp { @@ -886,7 +913,6 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error prevTimestamp = r.Timestamp } metricID := r.TSID.MetricID - hm := s.currHourMetricIDs.Load().(*hourMetricIDs) if hour == hm.hour { // The r belongs to the current hour. Check for the current hour cache. if hm.m.Has(metricID) { @@ -896,6 +922,7 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error s.pendingHourMetricIDsLock.Lock() s.pendingHourMetricIDs.Add(metricID) s.pendingHourMetricIDsLock.Unlock() + hm.iidx.AddMetricID(idb, metricID) } // Slower path: check global cache for (date, metricID) entry. @@ -920,31 +947,31 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error func (s *Storage) updateCurrHourMetricIDs() { hm := s.currHourMetricIDs.Load().(*hourMetricIDs) s.pendingHourMetricIDsLock.Lock() - newMetricIDsLen := s.pendingHourMetricIDs.Len() + newMetricIDs := s.pendingHourMetricIDs + s.pendingHourMetricIDs = &uint64set.Set{} s.pendingHourMetricIDsLock.Unlock() hour := uint64(timestampFromTime(time.Now())) / msecPerHour - if newMetricIDsLen == 0 && hm.hour == hour { + if newMetricIDs.Len() == 0 && hm.hour == hour { // Fast path: nothing to update. return } // Slow path: hm.m must be updated with non-empty s.pendingHourMetricIDs. var m *uint64set.Set + var iidx *inmemoryInvertedIndex isFull := hm.isFull if hm.hour == hour { m = hm.m.Clone() + iidx = hm.iidx.Clone() } else { m = &uint64set.Set{} + iidx = newInmemoryInvertedIndex() isFull = true } - s.pendingHourMetricIDsLock.Lock() - newMetricIDs := s.pendingHourMetricIDs - s.pendingHourMetricIDs = &uint64set.Set{} - s.pendingHourMetricIDsLock.Unlock() m.Union(newMetricIDs) - hmNew := &hourMetricIDs{ m: m, + iidx: iidx, hour: hour, isFull: isFull, } @@ -956,6 +983,7 @@ func (s *Storage) updateCurrHourMetricIDs() { type hourMetricIDs struct { m *uint64set.Set + iidx *inmemoryInvertedIndex hour uint64 isFull bool } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 2334994b93..4a339afb76 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -26,6 +26,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -61,6 +62,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) @@ -105,6 +107,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: 123, } hmOrig.m.Add(12) @@ -119,8 +122,8 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } } - if !reflect.DeepEqual(hmCurr.m, pendingHourMetricIDs) { - t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs) + if !hmCurr.m.Equal(pendingHourMetricIDs) { + t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourMetricIDs) } if !hmCurr.isFull { t.Fatalf("unexpected hmCurr.isFull; got %v; want %v", hmCurr.isFull, true) @@ -146,6 +149,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hour := uint64(timestampFromTime(time.Now())) / msecPerHour hmOrig := &hourMetricIDs{ m: &uint64set.Set{}, + iidx: newInmemoryInvertedIndex(), hour: hour, } hmOrig.m.Add(12) @@ -167,7 +171,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { for _, metricID := range origMetricIDs { m.Add(metricID) } - if !reflect.DeepEqual(hmCurr.m, m) { + if !hmCurr.m.Equal(m) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } if hmCurr.isFull { diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 285f1e527e..ee119333ed 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -12,8 +12,10 @@ import ( // // It is unsafe calling Set methods from concurrent goroutines. type Set struct { - itemsCount int - buckets bucket32Sorter + skipSmallPool bool + itemsCount int + buckets bucket32Sorter + smallPool [5]uint64 } type bucket32Sorter []*bucket32 @@ -35,8 +37,10 @@ func (s *Set) Clone() *Set { return &Set{} } var dst Set + dst.skipSmallPool = s.skipSmallPool dst.itemsCount = s.itemsCount dst.buckets = make([]*bucket32, len(s.buckets)) + dst.smallPool = s.smallPool for i, b32 := range s.buckets { dst.buckets[i] = b32.clone() } @@ -53,6 +57,10 @@ func (s *Set) Len() int { // Add adds x to s. func (s *Set) Add(x uint64) { + if !s.skipSmallPool { + s.addToSmallPool(x) + return + } hi := uint32(x >> 32) lo := uint32(x) for _, b32 := range s.buckets { @@ -66,6 +74,23 @@ func (s *Set) Add(x uint64) { s.addAlloc(hi, lo) } +func (s *Set) addToSmallPool(x uint64) { + if s.hasInSmallPool(x) { + return + } + if s.itemsCount < len(s.smallPool) { + s.smallPool[s.itemsCount] = x + s.itemsCount++ + return + } + s.skipSmallPool = true + s.itemsCount = 0 + for _, v := range s.smallPool[:] { + s.Add(v) + } + s.Add(x) +} + func (s *Set) addAlloc(hi, lo uint32) { var b32 bucket32 b32.hi = hi @@ -76,11 +101,14 @@ func (s *Set) addAlloc(hi, lo uint32) { // Has verifies whether x exists in s. func (s *Set) Has(x uint64) bool { - hi := uint32(x >> 32) - lo := uint32(x) if s == nil { return false } + if !s.skipSmallPool { + return s.hasInSmallPool(x) + } + hi := uint32(x >> 32) + lo := uint32(x) for _, b32 := range s.buckets { if b32.hi == hi { return b32.has(lo) @@ -89,8 +117,21 @@ func (s *Set) Has(x uint64) bool { return false } +func (s *Set) hasInSmallPool(x uint64) bool { + for _, v := range s.smallPool[:s.itemsCount] { + if v == x { + return true + } + } + return false +} + // Del deletes x from s. func (s *Set) Del(x uint64) { + if !s.skipSmallPool { + s.delFromSmallPool(x) + return + } hi := uint32(x >> 32) lo := uint32(x) for _, b32 := range s.buckets { @@ -103,13 +144,37 @@ func (s *Set) Del(x uint64) { } } +func (s *Set) delFromSmallPool(x uint64) { + idx := -1 + for i, v := range s.smallPool[:s.itemsCount] { + if v == x { + idx = i + } + } + if idx < 0 { + return + } + copy(s.smallPool[idx:], s.smallPool[idx+1:]) + s.itemsCount-- +} + // AppendTo appends all the items from the set to dst and returns the result. // // The returned items are sorted. +// +// AppendTo can mutate s. func (s *Set) AppendTo(dst []uint64) []uint64 { if s == nil { return dst } + if !s.skipSmallPool { + a := s.smallPool[:s.itemsCount] + if len(a) > 1 { + sort.Slice(a, func(i, j int) bool { return a[i] < a[j] }) + } + return append(dst, a...) + } + // pre-allocate memory for dst dstLen := len(dst) if n := s.Len() - cap(dst) + dstLen; n > 0 { @@ -128,23 +193,82 @@ func (s *Set) AppendTo(dst []uint64) []uint64 { // Union adds all the items from a to s. func (s *Set) Union(a *Set) { + // Clone a, since AppendTo may mutate it below. + aCopy := a.Clone() + if s.Len() == 0 { + // Fast path if the initial set is empty. + *s = *aCopy + return + } // TODO: optimize it - for _, x := range a.AppendTo(nil) { + for _, x := range aCopy.AppendTo(nil) { s.Add(x) } } +// Intersect removes all the items missing in a from s. +func (s *Set) Intersect(a *Set) { + if a.Len() == 0 { + // Fast path + *s = Set{} + return + } + // TODO: optimize it + for _, x := range s.AppendTo(nil) { + if !a.Has(x) { + s.Del(x) + } + } +} + +// Subtract removes from s all the shared items between s and a. +func (s *Set) Subtract(a *Set) { + if s.Len() == 0 { + return + } + // Copy a because AppendTo below can mutate a. + aCopy := a.Clone() + // TODO: optimize it + for _, x := range aCopy.AppendTo(nil) { + if s.Has(x) { + s.Del(x) + } + } +} + +// Equal returns true if s contains the same items as a. +func (s *Set) Equal(a *Set) bool { + if s.Len() != a.Len() { + return false + } + // Copy a because AppendTo below can mutate a + aCopy := a.Clone() + // TODO: optimize it + for _, x := range aCopy.AppendTo(nil) { + if !s.Has(x) { + return false + } + } + return true +} + type bucket32 struct { - hi uint32 - b16his []uint16 - buckets []*bucket16 + skipSmallPool bool + smallPoolLen int + hi uint32 + b16his []uint16 + buckets []*bucket16 + smallPool [14]uint32 } func (b *bucket32) clone() *bucket32 { var dst bucket32 + dst.skipSmallPool = b.skipSmallPool + dst.smallPoolLen = b.smallPoolLen dst.hi = b.hi dst.b16his = append(dst.b16his[:0], b.b16his...) dst.buckets = make([]*bucket16, len(b.buckets)) + dst.smallPool = b.smallPool for i, b16 := range b.buckets { dst.buckets[i] = b16.clone() } @@ -164,6 +288,9 @@ func (b *bucket32) Swap(i, j int) { const maxUnsortedBuckets = 32 func (b *bucket32) add(x uint32) bool { + if !b.skipSmallPool { + return b.addToSmallPool(x) + } hi := uint16(x >> 16) lo := uint16(x) if len(b.buckets) > maxUnsortedBuckets { @@ -178,6 +305,23 @@ func (b *bucket32) add(x uint32) bool { return true } +func (b *bucket32) addToSmallPool(x uint32) bool { + if b.hasInSmallPool(x) { + return false + } + if b.smallPoolLen < len(b.smallPool) { + b.smallPool[b.smallPoolLen] = x + b.smallPoolLen++ + return true + } + b.skipSmallPool = true + b.smallPoolLen = 0 + for _, v := range b.smallPool[:] { + b.add(v) + } + return b.add(x) +} + func (b *bucket32) addAllocSmall(hi, lo uint16) { var b16 bucket16 _ = b16.add(lo) @@ -199,6 +343,7 @@ func (b *bucket32) addSlow(hi, lo uint16) bool { func (b *bucket32) addAllocBig(hi, lo uint16, n int) { if n < 0 { + // This is a hint to Go compiler to remove automatic bounds checks below. return } var b16 bucket16 @@ -215,6 +360,9 @@ func (b *bucket32) addAllocBig(hi, lo uint16, n int) { } func (b *bucket32) has(x uint32) bool { + if !b.skipSmallPool { + return b.hasInSmallPool(x) + } hi := uint16(x >> 16) lo := uint16(x) if len(b.buckets) > maxUnsortedBuckets { @@ -228,6 +376,15 @@ func (b *bucket32) has(x uint32) bool { return false } +func (b *bucket32) hasInSmallPool(x uint32) bool { + for _, v := range b.smallPool[:b.smallPoolLen] { + if v == x { + return true + } + } + return false +} + func (b *bucket32) hasSlow(hi, lo uint16) bool { n := binarySearch16(b.b16his, hi) if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { @@ -237,6 +394,9 @@ func (b *bucket32) hasSlow(hi, lo uint16) bool { } func (b *bucket32) del(x uint32) bool { + if !b.skipSmallPool { + return b.delFromSmallPool(x) + } hi := uint16(x >> 16) lo := uint16(x) if len(b.buckets) > maxUnsortedBuckets { @@ -250,6 +410,21 @@ func (b *bucket32) del(x uint32) bool { return false } +func (b *bucket32) delFromSmallPool(x uint32) bool { + idx := -1 + for i, v := range b.smallPool[:b.smallPoolLen] { + if v == x { + idx = i + } + } + if idx < 0 { + return false + } + copy(b.smallPool[idx:], b.smallPool[idx+1:]) + b.smallPoolLen-- + return true +} + func (b *bucket32) delSlow(hi, lo uint16) bool { n := binarySearch16(b.b16his, hi) if n < 0 || n >= len(b.b16his) || b.b16his[n] != hi { @@ -259,6 +434,18 @@ func (b *bucket32) delSlow(hi, lo uint16) bool { } func (b *bucket32) appendTo(dst []uint64) []uint64 { + if !b.skipSmallPool { + a := b.smallPool[:b.smallPoolLen] + if len(a) > 1 { + sort.Slice(a, func(i, j int) bool { return a[i] < a[j] }) + } + hi := uint64(b.hi) << 32 + for _, lo32 := range a { + v := hi | uint64(lo32) + dst = append(dst, v) + } + return dst + } if len(b.buckets) <= maxUnsortedBuckets && !sort.IsSorted(b) { sort.Sort(b) } diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index 65628c2e32..4ec7d46df7 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -3,13 +3,14 @@ package uint64set import ( "fmt" "math/rand" + "reflect" "sort" "testing" "time" ) func TestSetBasicOps(t *testing.T) { - for _, itemsCount := range []int{1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} { + for _, itemsCount := range []int{1, 2, 3, 4, 5, 6, 1e2, 1e3, 1e4, 1e5, 1e6, maxUnsortedBuckets * bitsPerBucket * 2} { t.Run(fmt.Sprintf("items_%d", itemsCount), func(t *testing.T) { testSetBasicOps(t, itemsCount) }) @@ -21,7 +22,51 @@ func testSetBasicOps(t *testing.T, itemsCount int) { offset := uint64(time.Now().UnixNano()) + // Verify operations on nil set + { + var sNil *Set + if sNil.Has(123) { + t.Fatalf("sNil shouldn't contain any item; found 123") + } + if n := sNil.Len(); n != 0 { + t.Fatalf("unexpected sNil.Len(); got %d; want 0", n) + } + result := sNil.AppendTo(nil) + if result != nil { + t.Fatalf("sNil.AppendTo(nil) must return nil") + } + buf := []uint64{1, 2, 3} + result = sNil.AppendTo(buf) + if !reflect.DeepEqual(result, buf) { + t.Fatalf("sNil.AppendTo(buf) must return buf") + } + sCopy := sNil.Clone() + if n := sCopy.Len(); n != 0 { + t.Fatalf("unexpected sCopy.Len() from nil set; got %d; want 0", n) + } + sCopy.Add(123) + if n := sCopy.Len(); n != 1 { + t.Fatalf("unexpected sCopy.Len() after adding an item; got %d; want 1", n) + } + sCopy.Add(123) + if n := sCopy.Len(); n != 1 { + t.Fatalf("unexpected sCopy.Len() after adding an item twice; got %d; want 1", n) + } + if !sCopy.Has(123) { + t.Fatalf("sCopy must contain 123") + } + sCopy.Del(123) + if n := sCopy.Len(); n != 0 { + t.Fatalf("unexpected sCopy.Len() after deleting the item; got %d; want 0", n) + } + sCopy.Del(123) + if n := sCopy.Len(); n != 0 { + t.Fatalf("unexpected sCopy.Len() after double deleting the item; got %d; want 0", n) + } + } + // Verify forward Add + itemsCount = (itemsCount / 2) * 2 for i := 0; i < itemsCount/2; i++ { s.Add(uint64(i) + offset) } @@ -59,7 +104,7 @@ func testSetBasicOps(t *testing.T, itemsCount int) { } } - // Verify Clone + // Verify Clone and Equal sCopy := s.Clone() if n := sCopy.Len(); n != itemsCount { t.Fatalf("unexpected sCopy.Len(); got %d; want %d", n, itemsCount) @@ -69,6 +114,33 @@ func testSetBasicOps(t *testing.T, itemsCount int) { t.Fatalf("missing bit %d on sCopy", uint64(i)+offset) } } + if !sCopy.Equal(&s) { + t.Fatalf("s must equal to sCopy") + } + if !s.Equal(sCopy) { + t.Fatalf("sCopy must equal to s") + } + if s.Len() > 0 { + var sEmpty Set + if s.Equal(&sEmpty) { + t.Fatalf("s mustn't equal to sEmpty") + } + sNew := s.Clone() + sNew.Del(offset) + if sNew.Equal(&s) { + t.Fatalf("sNew mustn't equal to s") + } + if s.Equal(sNew) { + t.Fatalf("s mustn't equal to sNew") + } + sNew.Add(offset - 123) + if sNew.Equal(&s) { + t.Fatalf("sNew mustn't equal to s") + } + if s.Equal(sNew) { + t.Fatalf("s mustn't equal to sNew") + } + } // Verify AppendTo a := s.AppendTo(nil) @@ -89,31 +161,95 @@ func testSetBasicOps(t *testing.T, itemsCount int) { } // Verify union - const unionOffset = 12345 - var s1, s2 Set - for i := 0; i < itemsCount; i++ { - s1.Add(uint64(i) + offset) - s2.Add(uint64(i) + offset + unionOffset) + { + const unionOffset = 12345 + var s1, s2 Set + for i := 0; i < itemsCount; i++ { + s1.Add(uint64(i) + offset) + s2.Add(uint64(i) + offset + unionOffset) + } + s1.Union(&s2) + expectedLen := 2 * itemsCount + if itemsCount > unionOffset { + expectedLen = itemsCount + unionOffset + } + if n := s1.Len(); n != expectedLen { + t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen) + } + + // Verify union on empty set. + var s3 Set + s3.Union(&s1) + expectedLen = s1.Len() + if n := s3.Len(); n != expectedLen { + t.Fatalf("unexpected s3.Len() after union with empty set; got %d; want %d", n, expectedLen) + } } - s1.Union(&s2) - expectedLen := 2 * itemsCount - if itemsCount > unionOffset { - expectedLen = itemsCount + unionOffset + + // Verify intersect + { + const intersectOffset = 12345 + var s1, s2 Set + for i := 0; i < itemsCount; i++ { + s1.Add(uint64(i) + offset) + s2.Add(uint64(i) + offset + intersectOffset) + } + s1.Intersect(&s2) + expectedLen := 0 + if itemsCount > intersectOffset { + expectedLen = itemsCount - intersectOffset + } + if n := s1.Len(); n != expectedLen { + t.Fatalf("unexpected s1.Len() after intersect; got %d; want %d", n, expectedLen) + } + + // Verify intersect on empty set. + var s3 Set + s2.Intersect(&s3) + expectedLen = 0 + if n := s2.Len(); n != 0 { + t.Fatalf("unexpected s3.Len() after intersect with empty set; got %d; want %d", n, expectedLen) + } } - if n := s1.Len(); n != expectedLen { - t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen) + + // Verify subtract + { + const subtractOffset = 12345 + var s1, s2 Set + for i := 0; i < itemsCount; i++ { + s1.Add(uint64(i) + offset) + s2.Add(uint64(i) + offset + subtractOffset) + } + s1.Subtract(&s2) + expectedLen := itemsCount + if itemsCount > subtractOffset { + expectedLen = subtractOffset + } + if n := s1.Len(); n != expectedLen { + t.Fatalf("unexpected s1.Len() after subtract; got %d; want %d", n, expectedLen) + } + + // Verify subtract from empty set. + var s3 Set + s3.Subtract(&s2) + expectedLen = 0 + if n := s3.Len(); n != 0 { + t.Fatalf("unexpected s3.Len() after subtract from empty set; got %d; want %d", n, expectedLen) + } } // Verify Del + itemsDeleted := 0 for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ { s.Del(uint64(i) + offset) + itemsDeleted++ } - if n := s.Len(); n != itemsCount-itemsCount/4 { - t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsCount/4) + if n := s.Len(); n != itemsCount-itemsDeleted { + t.Fatalf("unexpected s.Len() after Del; got %d; want %d", n, itemsCount-itemsDeleted) } a = s.AppendTo(a[:0]) - if len(a) != itemsCount-itemsCount/4 { - t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsCount/4) + if len(a) != itemsCount-itemsDeleted { + t.Fatalf("unexpected len of exported array; got %d; want %d", len(a), itemsCount-itemsDeleted) } m = make(map[uint64]bool) for _, x := range a { @@ -137,8 +273,8 @@ func testSetBasicOps(t *testing.T, itemsCount int) { s.Del(uint64(i) + offset) s.Del(uint64(i) + offset + uint64(itemsCount)) } - if n := s.Len(); n != itemsCount-itemsCount/4 { - t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsCount/4) + if n := s.Len(); n != itemsCount-itemsDeleted { + t.Fatalf("unexpected s.Len() after Del for non-existing items; got %d; want %d", n, itemsCount-itemsDeleted) } // Verify sCopy has the original data