From fc2565b4eec9d256825be0ba43ae0487b90b440b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 3 Jun 2021 16:19:58 +0300 Subject: [PATCH] lib/storage: reduce memory allocations when syncing dateMetricIDCache --- lib/storage/storage.go | 64 +++++++++++++++++++++++++------------ lib/storage/storage_test.go | 8 +++-- 2 files changed, 49 insertions(+), 23 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 2cee13d679..c22c9226ff 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -2031,24 +2031,28 @@ type dateMetricIDCache struct { byDate atomic.Value // Contains mutable map protected by mu - byDateMutable *byDateMetricIDMap - lastSyncTime uint64 - mu sync.Mutex + byDateMutable *byDateMetricIDMap + nextSyncDeadline uint64 + mu sync.Mutex } func newDateMetricIDCache() *dateMetricIDCache { var dmc dateMetricIDCache - dmc.Reset() + dmc.resetLocked() return &dmc } func (dmc *dateMetricIDCache) Reset() { dmc.mu.Lock() + dmc.resetLocked() + dmc.mu.Unlock() +} + +func (dmc *dateMetricIDCache) resetLocked() { // Do not reset syncsCount and resetsCount dmc.byDate.Store(newByDateMetricIDMap()) dmc.byDateMutable = newByDateMetricIDMap() - dmc.lastSyncTime = fasttime.UnixTimestamp() - dmc.mu.Unlock() + dmc.nextSyncDeadline = 10 + fasttime.UnixTimestamp() atomic.AddUint64(&dmc.resetsCount, 1) } @@ -2081,20 +2085,12 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { } // Slow path. Check mutable map. - currentTime := fasttime.UnixTimestamp() dmc.mu.Lock() v = dmc.byDateMutable.get(date) ok := v.Has(metricID) - mustSync := false - if currentTime-dmc.lastSyncTime > 10 { - mustSync = true - dmc.lastSyncTime = currentTime - } + dmc.syncLockedIfNeeded() dmc.mu.Unlock() - if mustSync { - dmc.sync() - } return ok } @@ -2133,21 +2129,47 @@ func (dmc *dateMetricIDCache) Set(date, metricID uint64) { dmc.mu.Unlock() } -func (dmc *dateMetricIDCache) sync() { - dmc.mu.Lock() +func (dmc *dateMetricIDCache) syncLockedIfNeeded() { + currentTime := fasttime.UnixTimestamp() + if currentTime >= dmc.nextSyncDeadline { + dmc.nextSyncDeadline = currentTime + 10 + dmc.syncLocked() + } +} + +func (dmc *dateMetricIDCache) syncLocked() { + if len(dmc.byDateMutable.m) == 0 { + // Nothing to sync. + return + } byDate := dmc.byDate.Load().(*byDateMetricIDMap) - for date, e := range dmc.byDateMutable.m { + byDateMutable := dmc.byDateMutable + for date, e := range byDateMutable.m { v := byDate.get(date) - e.v.Union(v) + if v == nil { + continue + } + v = v.Clone() + v.Union(&e.v) + byDateMutable.m[date] = &byDateMetricIDEntry{ + date: date, + v: *v, + } + } + for date, e := range byDate.m { + v := byDateMutable.get(date) + if v != nil { + continue + } + byDateMutable.m[date] = e } dmc.byDate.Store(dmc.byDateMutable) dmc.byDateMutable = newByDateMetricIDMap() - dmc.mu.Unlock() atomic.AddUint64(&dmc.syncsCount, 1) if dmc.EntriesCount() > memory.Allowed()/128 { - dmc.Reset() + dmc.resetLocked() } } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 28c2ee5740..8814e56825 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -89,7 +89,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID) } if i%11234 == 0 { - c.sync() + c.mu.Lock() + c.syncLocked() + c.mu.Unlock() } if i%34323 == 0 { c.Reset() @@ -103,7 +105,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error { metricID := uint64(i) % 123 c.Set(date, metricID) } - c.sync() + c.mu.Lock() + c.syncLocked() + c.mu.Unlock() for i := 0; i < 1e5; i++ { date := uint64(i) % 3 metricID := uint64(i) % 123