diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 9eefb6e96..cf3eeffc4 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -264,8 +264,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) { // Add deleted metricIDs from extDB to db. if extDB != nil { dmisExt := extDB.getDeletedMetricIDs() - metricIDs := dmisExt.AppendTo(nil) - db.updateDeletedMetricIDs(metricIDs) + db.updateDeletedMetricIDs(dmisExt) } db.extDBLock.Lock() @@ -885,7 +884,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) { deletedCount := len(metricIDs) // atomically add deleted metricIDs to an inmemory map. - db.updateDeletedMetricIDs(metricIDs) + dmis := &uint64set.Set{} + for _, metricID := range metricIDs { + dmis.Add(metricID) + } + db.updateDeletedMetricIDs(dmis) // Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs. invalidateTagCache() @@ -914,13 +917,11 @@ func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) { db.deletedMetricIDs.Store(dmis) } -func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) { +func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) { db.deletedMetricIDsUpdateLock.Lock() dmisOld := db.getDeletedMetricIDs() dmisNew := dmisOld.Clone() - for _, metricID := range metricIDs { - dmisNew.Add(metricID) - } + dmisNew.Union(metricIDs) db.setDeletedMetricIDs(dmisNew) db.deletedMetricIDsUpdateLock.Unlock() } @@ -1584,9 +1585,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf } minMetricIDs = mIDs } - for _, metricID := range minMetricIDs.AppendTo(nil) { - metricIDs.Add(metricID) - } + metricIDs.Union(minMetricIDs) return nil } @@ -1911,9 +1910,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int) return nil, false } metricIDs := hmCurr.m.Clone() - for _, metricID := range hmPrev.m.AppendTo(nil) { - metricIDs.Add(metricID) - } + metricIDs.Union(hmPrev.m) return metricIDs, true } return nil, false diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 3339382b0..a446f7f0a 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -938,12 +938,10 @@ func (s *Storage) updateCurrHourMetricIDs() { isFull = true } s.pendingHourMetricIDsLock.Lock() - newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil) + newMetricIDs := s.pendingHourMetricIDs s.pendingHourMetricIDs = &uint64set.Set{} s.pendingHourMetricIDsLock.Unlock() - for _, metricID := range newMetricIDs { - m.Add(metricID) - } + m.Union(newMetricIDs) hmNew := &hourMetricIDs{ m: m, diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index 54cc96b87..285f1e527 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -126,6 +126,14 @@ func (s *Set) AppendTo(dst []uint64) []uint64 { return dst } +// Union adds all the items from a to s. +func (s *Set) Union(a *Set) { + // TODO: optimize it + for _, x := range a.AppendTo(nil) { + s.Add(x) + } +} + type bucket32 struct { hi uint32 b16his []uint16 diff --git a/lib/uint64set/uint64set_test.go b/lib/uint64set/uint64set_test.go index 2b6733b60..65628c2e3 100644 --- a/lib/uint64set/uint64set_test.go +++ b/lib/uint64set/uint64set_test.go @@ -88,6 +88,22 @@ func testSetBasicOps(t *testing.T, itemsCount int) { } } + // Verify union + const unionOffset = 12345 + var s1, s2 Set + for i := 0; i < itemsCount; i++ { + s1.Add(uint64(i) + offset) + s2.Add(uint64(i) + offset + unionOffset) + } + s1.Union(&s2) + expectedLen := 2 * itemsCount + if itemsCount > unionOffset { + expectedLen = itemsCount + unionOffset + } + if n := s1.Len(); n != expectedLen { + t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen) + } + // Verify Del for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ { s.Del(uint64(i) + offset)