From 140e7b6b74475f87c279bcdbf6d69748137edff4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 19 Jul 2023 17:37:49 -0700 Subject: [PATCH] all: replace atomic.Value with atomic.Pointer[T] This eliminates the need in .(*T) casting for results obtained from Load() Leave atomic.Value for map, since atomic.Pointer[map[...]...] makes double pointer to map, because map is already a pointer type. --- app/vmagent/remotewrite/remotewrite.go | 6 +-- app/vminsert/relabel/relabel.go | 6 +-- app/vmselect/promql/rollup_result_cache.go | 4 +- lib/appmetrics/appmetrics.go | 4 +- lib/bloomfilter/limiter.go | 6 +-- lib/encoding/zstd/zstd_pure.go | 6 +-- lib/promscrape/discovery/kuma/api.go | 2 +- lib/promscrape/discovery/kuma/kuma.go | 3 +- lib/promscrape/scraper.go | 9 ++-- lib/storage/storage.go | 58 +++++++++++----------- lib/storage/storage_test.go | 24 ++++----- lib/workingsetcache/cache.go | 48 +++++++++--------- 12 files changed, 87 insertions(+), 89 deletions(-) diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index ab8f1f0bd..0f2b2ba5c 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -93,7 +93,7 @@ func MultitenancyEnabled() bool { } // Contains the current relabelConfigs. -var allRelabelConfigs atomic.Value +var allRelabelConfigs atomic.Pointer[relabelConfigs] // maxQueues limits the maximum value for `-remoteWrite.queues`. There is no sense in setting too high value, // since it may lead to high memory usage due to big number of buffers. @@ -346,7 +346,7 @@ func Push(at *auth.Token, wr *prompbmarshal.WriteRequest) { } var rctx *relabelCtx - rcs := allRelabelConfigs.Load().(*relabelConfigs) + rcs := allRelabelConfigs.Load() pcsGlobal := rcs.global if pcsGlobal.Len() > 0 || len(labelsGlobal) > 0 { rctx = getRelabelCtx() @@ -612,7 +612,7 @@ func (rwctx *remoteWriteCtx) Push(tss []prompbmarshal.TimeSeries) { // Apply relabeling var rctx *relabelCtx var v *[]prompbmarshal.TimeSeries - rcs := allRelabelConfigs.Load().(*relabelConfigs) + rcs := allRelabelConfigs.Load() pcs := rcs.perURL[rwctx.idx] if pcs.Len() > 0 { rctx = getRelabelCtx() diff --git a/app/vminsert/relabel/relabel.go b/app/vminsert/relabel/relabel.go index 054b8b67f..fcb107c24 100644 --- a/app/vminsert/relabel/relabel.go +++ b/app/vminsert/relabel/relabel.go @@ -69,7 +69,7 @@ var ( configTimestamp = metrics.NewCounter(`vm_relabel_config_last_reload_success_timestamp_seconds`) ) -var pcsGlobal atomic.Value +var pcsGlobal atomic.Pointer[promrelabel.ParsedConfigs] // CheckRelabelConfig checks config pointed by -relabelConfig func CheckRelabelConfig() error { @@ -90,7 +90,7 @@ func loadRelabelConfig() (*promrelabel.ParsedConfigs, error) { // HasRelabeling returns true if there is global relabeling. func HasRelabeling() bool { - pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs) + pcs := pcsGlobal.Load() return pcs.Len() > 0 || *usePromCompatibleNaming } @@ -110,7 +110,7 @@ func (ctx *Ctx) Reset() { // // The returned labels are valid until the next call to ApplyRelabeling. func (ctx *Ctx) ApplyRelabeling(labels []prompb.Label) []prompb.Label { - pcs := pcsGlobal.Load().(*promrelabel.ParsedConfigs) + pcs := pcsGlobal.Load() if pcs.Len() == 0 && !*usePromCompatibleNaming { // There are no relabeling rules. return labels diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 1354bc470..376867db0 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -63,7 +63,7 @@ func checkRollupResultCacheReset() { for { time.Sleep(checkRollupResultCacheResetInterval) if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 { - mr := rollupResultResetMetricRowSample.Load().(*storage.MetricRow) + mr := rollupResultResetMetricRowSample.Load() d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds() logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs", mr.String(), cacheTimestampOffset, float64(d)/1e3) @@ -76,7 +76,7 @@ const checkRollupResultCacheResetInterval = 5 * time.Second var needRollupResultCacheReset uint32 var checkRollupResultCacheResetOnce sync.Once -var rollupResultResetMetricRowSample atomic.Value +var rollupResultResetMetricRowSample atomic.Pointer[storage.MetricRow] var rollupResultCacheV = &rollupResultCache{ c: workingsetcache.New(1024 * 1024), // This is a cache for testing. diff --git a/lib/appmetrics/appmetrics.go b/lib/appmetrics/appmetrics.go index ba6885e29..8532c528d 100644 --- a/lib/appmetrics/appmetrics.go +++ b/lib/appmetrics/appmetrics.go @@ -32,14 +32,14 @@ func WritePrometheusMetrics(w io.Writer) { } metricsCacheLock.Unlock() - bb := metricsCache.Load().(*bytesutil.ByteBuffer) + bb := metricsCache.Load() _, _ = w.Write(bb.B) } var ( metricsCacheLock sync.Mutex metricsCacheLastUpdateTime time.Time - metricsCache atomic.Value + metricsCache atomic.Pointer[bytesutil.ByteBuffer] ) func writePrometheusMetrics(w io.Writer) { diff --git a/lib/bloomfilter/limiter.go b/lib/bloomfilter/limiter.go index 1d047414b..70069ddd8 100644 --- a/lib/bloomfilter/limiter.go +++ b/lib/bloomfilter/limiter.go @@ -11,7 +11,7 @@ import ( // It is safe using the Limiter from concurrent goroutines. type Limiter struct { maxItems int - v atomic.Value + v atomic.Pointer[limiter] wg sync.WaitGroup stopCh chan struct{} @@ -55,7 +55,7 @@ func (l *Limiter) MaxItems() int { // CurrentItems return the current number of items registered in l. func (l *Limiter) CurrentItems() int { - lm := l.v.Load().(*limiter) + lm := l.v.Load() n := atomic.LoadUint64(&lm.currentItems) return int(n) } @@ -67,7 +67,7 @@ func (l *Limiter) CurrentItems() int { // True is returned if h is added or already exists in l. // False is returned if h cannot be added to l, since it already has maxItems unique items. func (l *Limiter) Add(h uint64) bool { - lm := l.v.Load().(*limiter) + lm := l.v.Load() return lm.Add(h) } diff --git a/lib/encoding/zstd/zstd_pure.go b/lib/encoding/zstd/zstd_pure.go index 3fc666b9c..5336337e3 100644 --- a/lib/encoding/zstd/zstd_pure.go +++ b/lib/encoding/zstd/zstd_pure.go @@ -15,7 +15,7 @@ var ( decoder *zstd.Decoder mu sync.Mutex - av atomic.Value + av atomic.Pointer[registry] ) type registry map[int]*zstd.Encoder @@ -45,7 +45,7 @@ func CompressLevel(dst, src []byte, compressionLevel int) []byte { } func getEncoder(compressionLevel int) *zstd.Encoder { - r := av.Load().(registry) + r := av.Load() e := r[compressionLevel] if e != nil { return e @@ -54,7 +54,7 @@ func getEncoder(compressionLevel int) *zstd.Encoder { mu.Lock() // Create the encoder under lock in order to prevent from wasted work // when concurrent goroutines create encoder for the same compressionLevel. - r1 := av.Load().(registry) + r1 := av.Load() if e = r1[compressionLevel]; e == nil { e = newEncoder(compressionLevel) r2 := make(registry) diff --git a/lib/promscrape/discovery/kuma/api.go b/lib/promscrape/discovery/kuma/api.go index c2c33359a..ce93572d0 100644 --- a/lib/promscrape/discovery/kuma/api.go +++ b/lib/promscrape/discovery/kuma/api.go @@ -27,7 +27,7 @@ type apiConfig struct { apiPath string // labels contains the latest discovered labels. - labels atomic.Value + labels atomic.Pointer[[]*promutils.Labels] cancel context.CancelFunc wg sync.WaitGroup diff --git a/lib/promscrape/discovery/kuma/kuma.go b/lib/promscrape/discovery/kuma/kuma.go index 7d42a46ba..c5bd82490 100644 --- a/lib/promscrape/discovery/kuma/kuma.go +++ b/lib/promscrape/discovery/kuma/kuma.go @@ -38,8 +38,7 @@ func (sdc *SDConfig) GetLabels(baseDir string) ([]*promutils.Labels, error) { if err != nil { return nil, fmt.Errorf("cannot get API config for kuma_sd: %w", err) } - v := cfg.labels.Load() - pLabels := v.(*[]*promutils.Labels) + pLabels := cfg.labels.Load() return *pLabels, nil } diff --git a/lib/promscrape/scraper.go b/lib/promscrape/scraper.go index 53bfccda4..f6bf3dd8d 100644 --- a/lib/promscrape/scraper.go +++ b/lib/promscrape/scraper.go @@ -84,18 +84,17 @@ var ( PendingScrapeConfigs int32 // configData contains -promscrape.config data - configData atomic.Value + configData atomic.Pointer[[]byte] ) // WriteConfigData writes -promscrape.config contents to w func WriteConfigData(w io.Writer) { - v := configData.Load() - if v == nil { + p := configData.Load() + if p == nil { // Nothing to write to w return } - b := v.(*[]byte) - _, _ = w.Write(*b) + _, _ = w.Write(*p) } func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) { diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 5188b2def..f42f3bc1d 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -60,7 +60,7 @@ type Storage struct { // lock file for exclusive access to the storage on the given path. flockF *os.File - idbCurr atomic.Value + idbCurr atomic.Pointer[indexDB] tb *table @@ -81,16 +81,16 @@ type Storage struct { dateMetricIDCache *dateMetricIDCache // Fast cache for MetricID values occurred during the current hour. - currHourMetricIDs atomic.Value + currHourMetricIDs atomic.Pointer[hourMetricIDs] // Fast cache for MetricID values occurred during the previous hour. - prevHourMetricIDs atomic.Value + prevHourMetricIDs atomic.Pointer[hourMetricIDs] // Fast cache for pre-populating per-day inverted index for the next day. // This is needed in order to remove CPU usage spikes at 00:00 UTC // due to creation of per-day inverted index for active time series. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 for details. - nextDayMetricIDs atomic.Value + nextDayMetricIDs atomic.Pointer[byDateMetricIDEntry] // Pending MetricID values to be added to currHourMetricIDs. pendingHourEntriesLock sync.Mutex @@ -101,7 +101,7 @@ type Storage struct { pendingNextDayMetricIDs *uint64set.Set // prefetchedMetricIDs contains metricIDs for pre-fetched metricNames in the prefetchMetricNames function. - prefetchedMetricIDs atomic.Value + prefetchedMetricIDs atomic.Pointer[uint64set.Set] // prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series. prefetchedMetricIDsDeadline uint64 @@ -129,7 +129,7 @@ type Storage struct { // // It is safe to keep the set in memory even for big number of deleted // metricIDs, since it usually requires 1 bit per deleted metricID. - deletedMetricIDs atomic.Value + deletedMetricIDs atomic.Pointer[uint64set.Set] deletedMetricIDsUpdateLock sync.Mutex isReadOnly uint32 @@ -268,7 +268,7 @@ func getTSIDCacheSize() int { } func (s *Storage) getDeletedMetricIDs() *uint64set.Set { - return s.deletedMetricIDs.Load().(*uint64set.Set) + return s.deletedMetricIDs.Load() } func (s *Storage) setDeletedMetricIDs(dmis *uint64set.Set) { @@ -434,7 +434,7 @@ func (s *Storage) DeleteStaleSnapshots(maxAge time.Duration) error { } func (s *Storage) idb() *indexDB { - return s.idbCurr.Load().(*indexDB) + return s.idbCurr.Load() } // Metrics contains essential metrics for the Storage. @@ -569,8 +569,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() + hmPrev := s.prevHourMetricIDs.Load() hourMetricIDsLen := hmPrev.m.Len() if hmCurr.m.Len() > hourMetricIDsLen { hourMetricIDsLen = hmCurr.m.Len() @@ -579,11 +579,11 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.HourMetricIDCacheSizeBytes += hmCurr.m.SizeBytes() m.HourMetricIDCacheSizeBytes += hmPrev.m.SizeBytes() - nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + nextDayMetricIDs := &s.nextDayMetricIDs.Load().v m.NextDayMetricIDCacheSize += uint64(nextDayMetricIDs.Len()) m.NextDayMetricIDCacheSizeBytes += nextDayMetricIDs.SizeBytes() - prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + prefetchedMetricIDs := s.prefetchedMetricIDs.Load() m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len()) m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes()) @@ -797,12 +797,12 @@ func (s *Storage) MustClose() { s.mustSaveCache(s.metricNameCache, "metricID_metricName") s.metricNameCache.Stop() - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() s.mustSaveHourMetricIDs(hmPrev, "prev_hour_metric_ids") - nextDayMetricIDs := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + nextDayMetricIDs := s.nextDayMetricIDs.Load() s.mustSaveNextDayMetricIDs(nextDayMetricIDs) // Release lock file. @@ -1093,7 +1093,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin return nil } var metricIDs uint64Sorter - prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) + prefetchedMetricIDs := s.prefetchedMetricIDs.Load() for _, metricID := range srcMetricIDs { if prefetchedMetricIDs.Has(metricID) { continue @@ -1906,10 +1906,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { prevDate uint64 prevMetricID uint64 ) - hm := s.currHourMetricIDs.Load().(*hourMetricIDs) - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hm := s.currHourMetricIDs.Load() + hmPrev := s.prevHourMetricIDs.Load() hmPrevDate := hmPrev.hour / 24 - nextDayMetricIDs := &s.nextDayMetricIDs.Load().(*byDateMetricIDEntry).v + nextDayMetricIDs := &s.nextDayMetricIDs.Load().v ts := fasttime.UnixTimestamp() // Start pre-populating the next per-day inverted index during the last hour of the current day. // pMin linearly increases from 0 to 1 during the last hour of the day. @@ -2055,7 +2055,7 @@ type dateMetricIDCache struct { resetsCount uint64 // Contains immutable map - byDate atomic.Value + byDate atomic.Pointer[byDateMetricIDMap] // Contains mutable map protected by mu byDateMutable *byDateMetricIDMap @@ -2085,7 +2085,7 @@ func (dmc *dateMetricIDCache) resetLocked() { } func (dmc *dateMetricIDCache) EntriesCount() int { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() n := 0 for _, e := range byDate.m { n += e.v.Len() @@ -2094,7 +2094,7 @@ func (dmc *dateMetricIDCache) EntriesCount() int { } func (dmc *dateMetricIDCache) SizeBytes() uint64 { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() n := uint64(0) for _, e := range byDate.m { n += e.v.SizeBytes() @@ -2103,7 +2103,7 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 { } func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool { - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() v := byDate.get(date) if v.Has(metricID) { // Fast path. @@ -2169,7 +2169,7 @@ func (dmc *dateMetricIDCache) syncLocked() { // Nothing to sync. return } - byDate := dmc.byDate.Load().(*byDateMetricIDMap) + byDate := dmc.byDate.Load() byDateMutable := dmc.byDateMutable for date, e := range byDateMutable.m { v := byDate.get(date) @@ -2182,7 +2182,7 @@ func (dmc *dateMetricIDCache) syncLocked() { date: date, v: *v, } - if date == byDateMutable.hotEntry.Load().(*byDateMetricIDEntry).date { + if date == byDateMutable.hotEntry.Load().date { byDateMutable.hotEntry.Store(dme) } byDateMutable.m[date] = dme @@ -2205,7 +2205,7 @@ func (dmc *dateMetricIDCache) syncLocked() { } type byDateMetricIDMap struct { - hotEntry atomic.Value + hotEntry atomic.Pointer[byDateMetricIDEntry] m map[uint64]*byDateMetricIDEntry } @@ -2218,7 +2218,7 @@ func newByDateMetricIDMap() *byDateMetricIDMap { } func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set { - hotEntry := dmm.hotEntry.Load().(*byDateMetricIDEntry) + hotEntry := dmm.hotEntry.Load() if hotEntry.date == date { // Fast path return &hotEntry.v @@ -2250,7 +2250,7 @@ type byDateMetricIDEntry struct { } func (s *Storage) updateNextDayMetricIDs(date uint64) { - e := s.nextDayMetricIDs.Load().(*byDateMetricIDEntry) + e := s.nextDayMetricIDs.Load() s.pendingNextDayMetricIDsLock.Lock() pendingMetricIDs := s.pendingNextDayMetricIDs s.pendingNextDayMetricIDs = &uint64set.Set{} @@ -2277,7 +2277,7 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) { } func (s *Storage) updateCurrHourMetricIDs(hour uint64) { - hm := s.currHourMetricIDs.Load().(*hourMetricIDs) + hm := s.currHourMetricIDs.Load() s.pendingHourEntriesLock.Lock() newMetricIDs := s.pendingHourEntries s.pendingHourEntries = &uint64set.Set{} diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 24ca50c70..f27079d60 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -169,7 +169,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -177,7 +177,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected length of hm.m; got %d; want %d", hmCurr.m.Len(), 0) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } @@ -197,7 +197,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -205,7 +205,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr; got %v; want %v", hmCurr, hmOrig) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -235,7 +235,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -243,7 +243,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hmCurr.m; got %v; want %v", hmCurr.m, pendingHourEntries) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } @@ -269,7 +269,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -284,7 +284,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -312,7 +312,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { hmOrig.m.Add(34) s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } @@ -327,7 +327,7 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { t.Fatalf("unexpected hm.m; got %v; want %v", hmCurr.m, m) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() hmEmpty := &hourMetricIDs{} if !reflect.DeepEqual(hmPrev, hmEmpty) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmEmpty) @@ -355,14 +355,14 @@ func TestUpdateCurrHourMetricIDs(t *testing.T) { } s.currHourMetricIDs.Store(hmOrig) s.updateCurrHourMetricIDs(hour) - hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) + hmCurr := s.currHourMetricIDs.Load() if hmCurr.hour != hour { t.Fatalf("unexpected hmCurr.hour; got %d; want %d", hmCurr.hour, hour) } if hmCurr.m.Len() != 0 { t.Fatalf("unexpected non-empty hmCurr.m; got %v", hmCurr.m.AppendTo(nil)) } - hmPrev := s.prevHourMetricIDs.Load().(*hourMetricIDs) + hmPrev := s.prevHourMetricIDs.Load() if !reflect.DeepEqual(hmPrev, hmOrig) { t.Fatalf("unexpected hmPrev; got %v; want %v", hmPrev, hmOrig) } diff --git a/lib/workingsetcache/cache.go b/lib/workingsetcache/cache.go index be7027976..a48235521 100644 --- a/lib/workingsetcache/cache.go +++ b/lib/workingsetcache/cache.go @@ -30,8 +30,8 @@ const ( // The cache evicts inactive entries after the given expireDuration. // Recently accessed entries survive expireDuration. type Cache struct { - curr atomic.Value - prev atomic.Value + curr atomic.Pointer[fastcache.Cache] + prev atomic.Pointer[fastcache.Cache] // csHistory holds cache stats history csHistory fastcache.Stats @@ -148,8 +148,8 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) { return } // Reset prev cache and swap it with the curr cache. - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() c.prev.Store(curr) var cs fastcache.Stats prev.UpdateStats(&cs) @@ -188,8 +188,8 @@ func (c *Cache) prevCacheWatcher() { c.mu.Unlock() return } - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() var csCurr, csPrev fastcache.Stats curr.UpdateStats(&csCurr) prev.UpdateStats(&csPrev) @@ -232,7 +232,7 @@ func (c *Cache) cacheSizeWatcher() { continue } var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) if cs.BytesSize >= uint64(0.9*float64(cs.MaxBytesSize)) { maxBytesSize = cs.MaxBytesSize @@ -254,8 +254,8 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(switching) - prev := c.prev.Load().(*fastcache.Cache) - curr := c.curr.Load().(*fastcache.Cache) + prev := c.prev.Load() + curr := c.curr.Load() c.prev.Store(curr) var cs fastcache.Stats prev.UpdateStats(&cs) @@ -273,7 +273,7 @@ func (c *Cache) cacheSizeWatcher() { case <-t.C: } var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) if cs.BytesSize >= maxBytesSize { break @@ -282,7 +282,7 @@ func (c *Cache) cacheSizeWatcher() { c.mu.Lock() c.setMode(whole) - prev = c.prev.Load().(*fastcache.Cache) + prev = c.prev.Load() c.prev.Store(fastcache.New(1024)) cs.Reset() prev.UpdateStats(&cs) @@ -293,7 +293,7 @@ func (c *Cache) cacheSizeWatcher() { // Save saves the cache to filePath. func (c *Cache) Save(filePath string) error { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() concurrency := cgroup.AvailableCPUs() return curr.SaveToFileConcurrent(filePath, concurrency) } @@ -311,10 +311,10 @@ func (c *Cache) Stop() { // Reset resets the cache. func (c *Cache) Reset() { var cs fastcache.Stats - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() prev.UpdateStats(&cs) prev.Reset() - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) updateCacheStatsHistory(&c.csHistory, &cs) curr.Reset() @@ -335,11 +335,11 @@ func (c *Cache) UpdateStats(fcs *fastcache.Stats) { updateCacheStatsHistory(fcs, &c.csHistory) var cs fastcache.Stats - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.UpdateStats(&cs) updateCacheStats(fcs, &cs) - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() cs.Reset() prev.UpdateStats(&cs) updateCacheStats(fcs, &cs) @@ -369,7 +369,7 @@ func updateCacheStatsHistory(dst, src *fastcache.Stats) { // Get appends the found value for the given key to dst and returns the result. func (c *Cache) Get(dst, key []byte) []byte { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() result := curr.Get(dst, key) if len(result) > len(dst) { // Fast path - the entry is found in the current cache. @@ -381,7 +381,7 @@ func (c *Cache) Get(dst, key []byte) []byte { } // Search for the entry in the previous cache. - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() result = prev.Get(dst, key) if len(result) <= len(dst) { // Nothing found. @@ -394,14 +394,14 @@ func (c *Cache) Get(dst, key []byte) []byte { // Has verifies whether the cache contains the given key. func (c *Cache) Has(key []byte) bool { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() if curr.Has(key) { return true } if c.loadMode() == whole { return false } - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() if !prev.Has(key) { return false } @@ -417,13 +417,13 @@ var tmpBufPool bytesutil.ByteBufferPool // Set sets the given value for the given key. func (c *Cache) Set(key, value []byte) { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.Set(key, value) } // GetBig appends the found value for the given key to dst and returns the result. func (c *Cache) GetBig(dst, key []byte) []byte { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() result := curr.GetBig(dst, key) if len(result) > len(dst) { // Fast path - the entry is found in the current cache. @@ -435,7 +435,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { } // Search for the entry in the previous cache. - prev := c.prev.Load().(*fastcache.Cache) + prev := c.prev.Load() result = prev.GetBig(dst, key) if len(result) <= len(dst) { // Nothing found. @@ -448,7 +448,7 @@ func (c *Cache) GetBig(dst, key []byte) []byte { // SetBig sets the given value for the given key. func (c *Cache) SetBig(key, value []byte) { - curr := c.curr.Load().(*fastcache.Cache) + curr := c.curr.Load() curr.SetBig(key, value) }