lib/{storage,uint64set}: add Set.Union() function and use it

This commit is contained in:
Aliaksandr Valialkin 2019-11-04 00:34:24 +02:00
parent 23e078261e
commit f5fbc3ffd7
3 changed files with 34 additions and 13 deletions

View file

@ -264,8 +264,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) {
// Add deleted metricIDs from extDB to db.
if extDB != nil {
dmisExt := extDB.getDeletedMetricIDs()
metricIDs := dmisExt.AppendTo(nil)
db.updateDeletedMetricIDs(metricIDs)
db.updateDeletedMetricIDs(dmisExt)
}
db.extDBLock.Lock()
@ -900,7 +899,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
deletedCount := len(metricIDs)
// atomically add deleted metricIDs to an inmemory map.
db.updateDeletedMetricIDs(metricIDs)
dmis := &uint64set.Set{}
for _, metricID := range metricIDs {
dmis.Add(metricID)
}
db.updateDeletedMetricIDs(dmis)
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
invalidateTagCache()
@ -929,13 +932,11 @@ func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) {
db.deletedMetricIDs.Store(dmis)
}
func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) {
func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
db.deletedMetricIDsUpdateLock.Lock()
dmisOld := db.getDeletedMetricIDs()
dmisNew := dmisOld.Clone()
for _, metricID := range metricIDs {
dmisNew.Add(metricID)
}
dmisNew.Union(metricIDs)
db.setDeletedMetricIDs(dmisNew)
db.deletedMetricIDsUpdateLock.Unlock()
}
@ -1601,9 +1602,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
}
minMetricIDs = mIDs
}
for _, metricID := range minMetricIDs.AppendTo(nil) {
metricIDs.Add(metricID)
}
metricIDs.Union(minMetricIDs)
return nil
}
@ -1947,9 +1946,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int,
return nil, false
}
metricIDs := mCurr.Clone()
for _, metricID := range mPrev.AppendTo(nil) {
metricIDs.Add(metricID)
}
metricIDs.Union(mPrev)
return metricIDs, true
}
return nil, false

View file

@ -126,6 +126,14 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
return dst
}
// Union adds all the items from a to s.
func (s *Set) Union(a *Set) {
// TODO: optimize it
for _, x := range a.AppendTo(nil) {
s.Add(x)
}
}
type bucket32 struct {
hi uint32
b16his []uint16

View file

@ -88,6 +88,22 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
}
}
// Verify union
const unionOffset = 12345
var s1, s2 Set
for i := 0; i < itemsCount; i++ {
s1.Add(uint64(i) + offset)
s2.Add(uint64(i) + offset + unionOffset)
}
s1.Union(&s2)
expectedLen := 2 * itemsCount
if itemsCount > unionOffset {
expectedLen = itemsCount + unionOffset
}
if n := s1.Len(); n != expectedLen {
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
}
// Verify Del
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
s.Del(uint64(i) + offset)