mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
lib/{storage,uint64set}: add Set.Union() function and use it
This commit is contained in:
parent
9becc26f4b
commit
6be4456d88
4 changed files with 36 additions and 17 deletions
|
@ -264,8 +264,7 @@ func (db *indexDB) SetExtDB(extDB *indexDB) {
|
||||||
// Add deleted metricIDs from extDB to db.
|
// Add deleted metricIDs from extDB to db.
|
||||||
if extDB != nil {
|
if extDB != nil {
|
||||||
dmisExt := extDB.getDeletedMetricIDs()
|
dmisExt := extDB.getDeletedMetricIDs()
|
||||||
metricIDs := dmisExt.AppendTo(nil)
|
db.updateDeletedMetricIDs(dmisExt)
|
||||||
db.updateDeletedMetricIDs(metricIDs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
db.extDBLock.Lock()
|
db.extDBLock.Lock()
|
||||||
|
@ -885,7 +884,11 @@ func (db *indexDB) DeleteTSIDs(tfss []*TagFilters) (int, error) {
|
||||||
deletedCount := len(metricIDs)
|
deletedCount := len(metricIDs)
|
||||||
|
|
||||||
// atomically add deleted metricIDs to an inmemory map.
|
// atomically add deleted metricIDs to an inmemory map.
|
||||||
db.updateDeletedMetricIDs(metricIDs)
|
dmis := &uint64set.Set{}
|
||||||
|
for _, metricID := range metricIDs {
|
||||||
|
dmis.Add(metricID)
|
||||||
|
}
|
||||||
|
db.updateDeletedMetricIDs(dmis)
|
||||||
|
|
||||||
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.
|
||||||
invalidateTagCache()
|
invalidateTagCache()
|
||||||
|
@ -914,13 +917,11 @@ func (db *indexDB) setDeletedMetricIDs(dmis *uint64set.Set) {
|
||||||
db.deletedMetricIDs.Store(dmis)
|
db.deletedMetricIDs.Store(dmis)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *indexDB) updateDeletedMetricIDs(metricIDs []uint64) {
|
func (db *indexDB) updateDeletedMetricIDs(metricIDs *uint64set.Set) {
|
||||||
db.deletedMetricIDsUpdateLock.Lock()
|
db.deletedMetricIDsUpdateLock.Lock()
|
||||||
dmisOld := db.getDeletedMetricIDs()
|
dmisOld := db.getDeletedMetricIDs()
|
||||||
dmisNew := dmisOld.Clone()
|
dmisNew := dmisOld.Clone()
|
||||||
for _, metricID := range metricIDs {
|
dmisNew.Union(metricIDs)
|
||||||
dmisNew.Add(metricID)
|
|
||||||
}
|
|
||||||
db.setDeletedMetricIDs(dmisNew)
|
db.setDeletedMetricIDs(dmisNew)
|
||||||
db.deletedMetricIDsUpdateLock.Unlock()
|
db.deletedMetricIDsUpdateLock.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -1584,9 +1585,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(metricIDs *uint64set.Set, tf
|
||||||
}
|
}
|
||||||
minMetricIDs = mIDs
|
minMetricIDs = mIDs
|
||||||
}
|
}
|
||||||
for _, metricID := range minMetricIDs.AppendTo(nil) {
|
metricIDs.Union(minMetricIDs)
|
||||||
metricIDs.Add(metricID)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1911,9 +1910,7 @@ func (is *indexSearch) getMetricIDsForRecentHours(tr TimeRange, maxMetrics int)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
metricIDs := hmCurr.m.Clone()
|
metricIDs := hmCurr.m.Clone()
|
||||||
for _, metricID := range hmPrev.m.AppendTo(nil) {
|
metricIDs.Union(hmPrev.m)
|
||||||
metricIDs.Add(metricID)
|
|
||||||
}
|
|
||||||
return metricIDs, true
|
return metricIDs, true
|
||||||
}
|
}
|
||||||
return nil, false
|
return nil, false
|
||||||
|
|
|
@ -938,12 +938,10 @@ func (s *Storage) updateCurrHourMetricIDs() {
|
||||||
isFull = true
|
isFull = true
|
||||||
}
|
}
|
||||||
s.pendingHourMetricIDsLock.Lock()
|
s.pendingHourMetricIDsLock.Lock()
|
||||||
newMetricIDs := s.pendingHourMetricIDs.AppendTo(nil)
|
newMetricIDs := s.pendingHourMetricIDs
|
||||||
s.pendingHourMetricIDs = &uint64set.Set{}
|
s.pendingHourMetricIDs = &uint64set.Set{}
|
||||||
s.pendingHourMetricIDsLock.Unlock()
|
s.pendingHourMetricIDsLock.Unlock()
|
||||||
for _, metricID := range newMetricIDs {
|
m.Union(newMetricIDs)
|
||||||
m.Add(metricID)
|
|
||||||
}
|
|
||||||
|
|
||||||
hmNew := &hourMetricIDs{
|
hmNew := &hourMetricIDs{
|
||||||
m: m,
|
m: m,
|
||||||
|
|
|
@ -126,6 +126,14 @@ func (s *Set) AppendTo(dst []uint64) []uint64 {
|
||||||
return dst
|
return dst
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Union adds all the items from a to s.
|
||||||
|
func (s *Set) Union(a *Set) {
|
||||||
|
// TODO: optimize it
|
||||||
|
for _, x := range a.AppendTo(nil) {
|
||||||
|
s.Add(x)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type bucket32 struct {
|
type bucket32 struct {
|
||||||
hi uint32
|
hi uint32
|
||||||
b16his []uint16
|
b16his []uint16
|
||||||
|
|
|
@ -88,6 +88,22 @@ func testSetBasicOps(t *testing.T, itemsCount int) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verify union
|
||||||
|
const unionOffset = 12345
|
||||||
|
var s1, s2 Set
|
||||||
|
for i := 0; i < itemsCount; i++ {
|
||||||
|
s1.Add(uint64(i) + offset)
|
||||||
|
s2.Add(uint64(i) + offset + unionOffset)
|
||||||
|
}
|
||||||
|
s1.Union(&s2)
|
||||||
|
expectedLen := 2 * itemsCount
|
||||||
|
if itemsCount > unionOffset {
|
||||||
|
expectedLen = itemsCount + unionOffset
|
||||||
|
}
|
||||||
|
if n := s1.Len(); n != expectedLen {
|
||||||
|
t.Fatalf("unexpected s1.Len() after union; got %d; want %d", n, expectedLen)
|
||||||
|
}
|
||||||
|
|
||||||
// Verify Del
|
// Verify Del
|
||||||
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
|
for i := itemsCount / 2; i < itemsCount-itemsCount/4; i++ {
|
||||||
s.Del(uint64(i) + offset)
|
s.Del(uint64(i) + offset)
|
||||||
|
|
Loading…
Reference in a new issue