diff --git a/lib/storage/storage.go b/lib/storage/storage.go index f85eacfbb..d54692895 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -103,9 +103,15 @@ type Storage struct { pendingNextDayMetricIDsLock sync.Mutex pendingNextDayMetricIDs *uint64set.Set - // metricIDs for pre-fetched metricNames in the prefetchMetricNames function. + // prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function. prefetchedMetricIDs atomic.Value + // prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series. + prefetchedMetricIDsDeadline uint64 + + // prefetchedMetricIDsLock is used for serializing updates of prefetchedMetricIDs from concurrent goroutines. + prefetchedMetricIDsLock sync.Mutex + stop chan struct{} currHourMetricIDsUpdaterWG sync.WaitGroup @@ -1149,14 +1155,22 @@ func (s *Storage) prefetchMetricNames(tsids []TSID, deadline uint64) error { } // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. - - prefetchedMetricIDsNew := prefetchedMetricIDs.Clone() + s.prefetchedMetricIDsLock.Lock() + var prefetchedMetricIDsNew *uint64set.Set + if fasttime.UnixTimestamp() < atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) { + // Periodically reset the prefetchedMetricIDs in order to limit its size. + prefetchedMetricIDsNew = &uint64set.Set{} + atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, fasttime.UnixTimestamp()+73*60) + } else { + prefetchedMetricIDsNew = prefetchedMetricIDs.Clone() + } prefetchedMetricIDsNew.AddMulti(metricIDs) if prefetchedMetricIDsNew.SizeBytes() > uint64(memory.Allowed())/32 { // Reset prefetchedMetricIDsNew if it occupies too much space. prefetchedMetricIDsNew = &uint64set.Set{} } s.prefetchedMetricIDs.Store(prefetchedMetricIDsNew) + s.prefetchedMetricIDsLock.Unlock() return nil }