From 6697da73e5271967cbc099ed418d3358d193fb1a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 24 Feb 2024 02:44:19 +0200 Subject: [PATCH] app: consistently use atomic.* types instead of atomic.* functions See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- app/vmagent/remotewrite/pendingseries.go | 7 ++-- app/vmagent/remotewrite/remotewrite.go | 16 ++++---- app/vmagent/remotewrite/statconn.go | 4 +- app/vmalert/remotewrite/client_test.go | 8 ++-- app/vmauth/auth_config.go | 29 +++++++------- app/vmctl/main.go | 8 ++-- app/vmselect/graphite/transform.go | 10 ++--- app/vmselect/netstorage/netstorage.go | 18 ++++----- app/vmselect/prometheus/prometheus.go | 12 +++--- .../prometheus/query_range_response.qtpl | 4 +- .../prometheus/query_range_response.qtpl.go | 4 +- app/vmselect/prometheus/query_response.qtpl | 4 +- .../prometheus/query_response.qtpl.go | 4 +- app/vmselect/promql/active_queries.go | 8 +++- app/vmselect/promql/eval.go | 39 ++++++++++--------- app/vmselect/promql/eval_test.go | 8 ++-- app/vmselect/promql/exec.go | 15 +++---- app/vmselect/promql/rollup_result_cache.go | 38 ++++++++++-------- 18 files changed, 120 insertions(+), 116 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index f2b3c8341..2e31c8e3f 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -82,7 +82,7 @@ func (ps *pendingSeries) periodicFlusher() { ps.mu.Unlock() return case <-ticker.C: - if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) { + if fasttime.UnixTimestamp()-ps.wr.lastFlushTime.Load() < uint64(flushSeconds) { continue } } @@ -93,8 +93,7 @@ func (ps *pendingSeries) periodicFlusher() { } type writeRequest struct { - // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. - lastFlushTime uint64 + lastFlushTime atomic.Uint64 // The queue to send blocks to. fq *persistentqueue.FastQueue @@ -155,7 +154,7 @@ func (wr *writeRequest) mustWriteBlock(block []byte) bool { func (wr *writeRequest) tryFlush() bool { wr.wr.Timeseries = wr.tss - atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) + wr.lastFlushTime.Store(fasttime.UnixTimestamp()) if !tryPushWriteRequest(&wr.wr, wr.fq.TryWriteBlock, wr.isVMRemoteWrite) { return false } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 65ee2d730..c568e2a36 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -536,7 +536,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar // Push sharded data to remote storages in parallel in order to reduce // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup - var anyPushFailed uint64 + var anyPushFailed atomic.Bool for i, rwctx := range rwctxs { tssShard := tssByURL[i] if len(tssShard) == 0 { @@ -546,12 +546,12 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { defer wg.Done() if !rwctx.TryPush(tss) { - atomic.StoreUint64(&anyPushFailed, 1) + anyPushFailed.Store(true) } }(rwctx, tssShard) } wg.Wait() - return atomic.LoadUint64(&anyPushFailed) == 0 + return !anyPushFailed.Load() } // Replicate data among rwctxs. @@ -559,17 +559,17 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup wg.Add(len(rwctxs)) - var anyPushFailed uint64 + var anyPushFailed atomic.Bool for _, rwctx := range rwctxs { go func(rwctx *remoteWriteCtx) { defer wg.Done() if !rwctx.TryPush(tssBlock) { - atomic.StoreUint64(&anyPushFailed, 1) + anyPushFailed.Store(true) } }(rwctx) } wg.Wait() - return atomic.LoadUint64(&anyPushFailed) == 0 + return !anyPushFailed.Load() } // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. @@ -670,7 +670,7 @@ type remoteWriteCtx struct { streamAggrDropInput bool pss []*pendingSeries - pssNextIdx uint64 + pssNextIdx atomic.Uint64 rowsPushedAfterRelabel *metrics.Counter rowsDroppedByRelabel *metrics.Counter @@ -872,7 +872,7 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo } pss := rwctx.pss - idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) + idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss)) ok := pss[idx].TryPush(tss) diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index cb4d756ec..ff28e7580 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -50,7 +50,7 @@ var ( ) type statConn struct { - closed uint64 + closed atomic.Int32 net.Conn } @@ -76,7 +76,7 @@ func (sc *statConn) Write(p []byte) (int, error) { func (sc *statConn) Close() error { err := sc.Conn.Close() - if atomic.AddUint64(&sc.closed, 1) == 1 { + if sc.closed.Add(1) == 1 { conns.Dec() } return err diff --git a/app/vmalert/remotewrite/client_test.go b/app/vmalert/remotewrite/client_test.go index 5a9461c24..76ab3fc92 100644 --- a/app/vmalert/remotewrite/client_test.go +++ b/app/vmalert/remotewrite/client_test.go @@ -91,14 +91,12 @@ func newRWServer() *rwServer { } type rwServer struct { - // WARN: ordering of fields is important for alignment! - // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG - acceptedRows uint64 + acceptedRows atomic.Uint64 *httptest.Server } func (rw *rwServer) accepted() int { - return int(atomic.LoadUint64(&rw.acceptedRows)) + return int(rw.acceptedRows.Load()) } func (rw *rwServer) err(w http.ResponseWriter, err error) { @@ -144,7 +142,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) { rw.err(w, fmt.Errorf("unmarhsal err: %w", err)) return } - atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries))) + rw.acceptedRows.Add(uint64(len(wr.Timeseries))) w.WriteHeader(http.StatusNoContent) } diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go index 43ab7cb6f..3236032b2 100644 --- a/app/vmauth/auth_config.go +++ b/app/vmauth/auth_config.go @@ -164,7 +164,7 @@ type Regex struct { // URLPrefix represents passed `url_prefix` type URLPrefix struct { - n uint32 + n atomic.Uint32 // the list of backend urls bus []*backendURL @@ -192,27 +192,28 @@ func (up *URLPrefix) setLoadBalancingPolicy(loadBalancingPolicy string) error { } type backendURL struct { - brokenDeadline uint64 - concurrentRequests int32 - url *url.URL + brokenDeadline atomic.Uint64 + concurrentRequests atomic.Int32 + + url *url.URL } func (bu *backendURL) isBroken() bool { ct := fasttime.UnixTimestamp() - return ct < atomic.LoadUint64(&bu.brokenDeadline) + return ct < bu.brokenDeadline.Load() } func (bu *backendURL) setBroken() { deadline := fasttime.UnixTimestamp() + uint64((*failTimeout).Seconds()) - atomic.StoreUint64(&bu.brokenDeadline, deadline) + bu.brokenDeadline.Store(deadline) } func (bu *backendURL) get() { - atomic.AddInt32(&bu.concurrentRequests, 1) + bu.concurrentRequests.Add(1) } func (bu *backendURL) put() { - atomic.AddInt32(&bu.concurrentRequests, -1) + bu.concurrentRequests.Add(-1) } func (up *URLPrefix) getBackendsCount() int { @@ -266,7 +267,7 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL { } // Slow path - select other backend urls. - n := atomic.AddUint32(&up.n, 1) + n := up.n.Add(1) for i := uint32(0); i < uint32(len(bus)); i++ { idx := (n + i) % uint32(len(bus)) @@ -274,22 +275,22 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL { if bu.isBroken() { continue } - if atomic.LoadInt32(&bu.concurrentRequests) == 0 { + if bu.concurrentRequests.Load() == 0 { // Fast path - return the backend with zero concurrently executed requests. - // Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores. - atomic.AddInt32(&bu.concurrentRequests, 1) + // Do not use CompareAndSwap() instead of Load(), since it is much slower on systems with many CPU cores. + bu.concurrentRequests.Add(1) return bu } } // Slow path - return the backend with the minimum number of concurrently executed requests. buMin := bus[n%uint32(len(bus))] - minRequests := atomic.LoadInt32(&buMin.concurrentRequests) + minRequests := buMin.concurrentRequests.Load() for _, bu := range bus { if bu.isBroken() { continue } - if n := atomic.LoadInt32(&bu.concurrentRequests); n < minRequests { + if n := bu.concurrentRequests.Load(); n < minRequests { buMin = bu minRequests = n } diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 1071389b2..94b41a519 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -330,14 +330,14 @@ func main() { if err != nil { return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1) } - var blocksCount uint64 + var blocksCount atomic.Uint64 if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error { - atomic.AddUint64(&blocksCount, 1) + blocksCount.Add(1) return nil }); err != nil { - return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount, err), 1) + return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1) } - log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount) + log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load()) return nil }, }, diff --git a/app/vmselect/graphite/transform.go b/app/vmselect/graphite/transform.go index 9232cca8c..6d25d0394 100644 --- a/app/vmselect/graphite/transform.go +++ b/app/vmselect/graphite/transform.go @@ -3842,18 +3842,18 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (* errCh <- err close(errCh) }() - var skipProcessing uint32 + var skipProcessing atomic.Bool for i := 0; i < goroutines; i++ { go func() { defer wg.Done() for s := range seriesCh { - if atomic.LoadUint32(&skipProcessing) != 0 { + if skipProcessing.Load() { continue } sNew, err := f(s) if err != nil { // Drain the rest of series and do not call f for them in order to conserve CPU time. - atomic.StoreUint32(&skipProcessing, 1) + skipProcessing.Store(true) resultCh <- &result{ err: err, } @@ -5609,9 +5609,9 @@ func (nsf *nextSeriesFunc) peekStep(step int64) (int64, error) { if s != nil { step = s.step } - calls := uint64(0) + var calls atomic.Uint64 *nsf = func() (*series, error) { - if atomic.AddUint64(&calls, 1) == 1 { + if calls.Add(1) == 1 { return s, nil } return nextSeries() diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 0a054bcb3..3d60b30a1 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -84,7 +84,7 @@ func (rss *Results) mustClose() { } type timeseriesWork struct { - mustStop *uint32 + mustStop *atomic.Bool rss *Results pts *packedTimeseries f func(rs *Result, workerID uint) error @@ -94,22 +94,22 @@ type timeseriesWork struct { } func (tsw *timeseriesWork) do(r *Result, workerID uint) error { - if atomic.LoadUint32(tsw.mustStop) != 0 { + if tsw.mustStop.Load() { return nil } rss := tsw.rss if rss.deadline.Exceeded() { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) } if err := tsw.pts.Unpack(r, rss.tbf, rss.tr); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return fmt.Errorf("error during time series unpacking: %w", err) } tsw.rowsProcessed = len(r.Timestamps) if len(r.Timestamps) > 0 { if err := tsw.f(r, workerID); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return err } } @@ -241,7 +241,7 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke return 0, nil } - var mustStop uint32 + var mustStop atomic.Bool initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) { tsw.rss = rss tsw.pts = pts @@ -1011,7 +1011,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear var ( errGlobal error errGlobalLock sync.Mutex - mustStop uint32 + mustStop atomic.Bool ) var wg sync.WaitGroup wg.Add(gomaxprocs) @@ -1023,7 +1023,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear errGlobalLock.Lock() if errGlobal == nil { errGlobal = err - atomic.StoreUint32(&mustStop, 1) + mustStop.Store(true) } errGlobalLock.Unlock() } @@ -1041,7 +1041,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear if deadline.Exceeded() { return fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String()) } - if atomic.LoadUint32(&mustStop) != 0 { + if mustStop.Load() { break } xw := exportWorkPool.Get().(*exportWork) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 99ed247df..c0cfdacb1 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -320,21 +320,21 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara } } else if format == "promapi" { WriteExportPromAPIHeader(bw) - firstLineOnce := uint32(0) - firstLineSent := uint32(0) + var firstLineOnce atomic.Bool + var firstLineSent atomic.Bool writeLineFunc = func(xb *exportBlock, workerID uint) error { bb := sw.getBuffer(workerID) - // Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization + // Use Load() in front of CompareAndSwap() in order to avoid slow inter-CPU synchronization // in fast path after the first line has been already sent. - if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { + if !firstLineOnce.Load() && firstLineOnce.CompareAndSwap(false, true) { // Send the first line to sw.bw WriteExportPromAPILine(bb, xb) _, err := sw.bw.Write(bb.B) bb.Reset() - atomic.StoreUint32(&firstLineSent, 1) + firstLineSent.Store(true) return err } - for atomic.LoadUint32(&firstLineSent) == 0 { + for !firstLineSent.Load() { // Busy wait until the first line is sent to sw.bw runtime.Gosched() } diff --git a/app/vmselect/prometheus/query_range_response.qtpl b/app/vmselect/prometheus/query_range_response.qtpl index eecaef8e3..5aea84f66 100644 --- a/app/vmselect/prometheus/query_range_response.qtpl +++ b/app/vmselect/prometheus/query_range_response.qtpl @@ -33,8 +33,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries // seriesFetched is string instead of int because of historical reasons. // It cannot be converted to int without breaking backwards compatibility at vmalert :( %} - "seriesFetched": "{%dl qs.SeriesFetched %}", - "executionTimeMsec": {%dl qs.ExecutionTimeMsec %} + "seriesFetched": "{%dl qs.SeriesFetched.Load() %}", + "executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %} } {% code qt.Printf("generate /api/v1/query_range response for series=%d, points=%d", seriesCount, pointsCount) diff --git a/app/vmselect/prometheus/query_range_response.qtpl.go b/app/vmselect/prometheus/query_range_response.qtpl.go index 0d3eaee2c..ea99e7e15 100644 --- a/app/vmselect/prometheus/query_range_response.qtpl.go +++ b/app/vmselect/prometheus/query_range_response.qtpl.go @@ -68,11 +68,11 @@ func StreamQueryRangeResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, //line app/vmselect/prometheus/query_range_response.qtpl:35 qw422016.N().S(`"seriesFetched": "`) //line app/vmselect/prometheus/query_range_response.qtpl:36 - qw422016.N().DL(qs.SeriesFetched) + qw422016.N().DL(qs.SeriesFetched.Load()) //line app/vmselect/prometheus/query_range_response.qtpl:36 qw422016.N().S(`","executionTimeMsec":`) //line app/vmselect/prometheus/query_range_response.qtpl:37 - qw422016.N().DL(qs.ExecutionTimeMsec) + qw422016.N().DL(qs.ExecutionTimeMsec.Load()) //line app/vmselect/prometheus/query_range_response.qtpl:37 qw422016.N().S(`}`) //line app/vmselect/prometheus/query_range_response.qtpl:40 diff --git a/app/vmselect/prometheus/query_response.qtpl b/app/vmselect/prometheus/query_response.qtpl index f64154d0e..cf0bab995 100644 --- a/app/vmselect/prometheus/query_response.qtpl +++ b/app/vmselect/prometheus/query_response.qtpl @@ -35,8 +35,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries // seriesFetched is string instead of int because of historical reasons. // It cannot be converted to int without breaking backwards compatibility at vmalert :( %} - "seriesFetched": "{%dl qs.SeriesFetched %}", - "executionTimeMsec": {%dl qs.ExecutionTimeMsec %} + "seriesFetched": "{%dl qs.SeriesFetched.Load() %}", + "executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %} } {% code qt.Printf("generate /api/v1/query response for series=%d", seriesCount) diff --git a/app/vmselect/prometheus/query_response.qtpl.go b/app/vmselect/prometheus/query_response.qtpl.go index 6e1711040..d849d5435 100644 --- a/app/vmselect/prometheus/query_response.qtpl.go +++ b/app/vmselect/prometheus/query_response.qtpl.go @@ -78,11 +78,11 @@ func StreamQueryResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, qt * //line app/vmselect/prometheus/query_response.qtpl:37 qw422016.N().S(`"seriesFetched": "`) //line app/vmselect/prometheus/query_response.qtpl:38 - qw422016.N().DL(qs.SeriesFetched) + qw422016.N().DL(qs.SeriesFetched.Load()) //line app/vmselect/prometheus/query_response.qtpl:38 qw422016.N().S(`","executionTimeMsec":`) //line app/vmselect/prometheus/query_response.qtpl:39 - qw422016.N().DL(qs.ExecutionTimeMsec) + qw422016.N().DL(qs.ExecutionTimeMsec.Load()) //line app/vmselect/prometheus/query_response.qtpl:39 qw422016.N().S(`}`) //line app/vmselect/prometheus/query_response.qtpl:42 diff --git a/app/vmselect/promql/active_queries.go b/app/vmselect/promql/active_queries.go index cf60de2c2..ccad4170f 100644 --- a/app/vmselect/promql/active_queries.go +++ b/app/vmselect/promql/active_queries.go @@ -60,7 +60,7 @@ func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 { aqe.start = ec.Start aqe.end = ec.End aqe.step = ec.Step - aqe.qid = atomic.AddUint64(&nextActiveQueryID, 1) + aqe.qid = nextActiveQueryID.Add(1) aqe.quotedRemoteAddr = ec.QuotedRemoteAddr aqe.q = q aqe.startTime = time.Now() @@ -87,4 +87,8 @@ func (aq *activeQueries) GetAll() []activeQueryEntry { return aqes } -var nextActiveQueryID = uint64(time.Now().UnixNano()) +var nextActiveQueryID = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x +}() diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index 603cb6991..da55ad86d 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -171,16 +171,17 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig { // QueryStats contains various stats for the query. type QueryStats struct { // SeriesFetched contains the number of series fetched from storage during the query evaluation. - SeriesFetched int64 + SeriesFetched atomic.Int64 + // ExecutionTimeMsec contains the number of milliseconds the query took to execute. - ExecutionTimeMsec int64 + ExecutionTimeMsec atomic.Int64 } func (qs *QueryStats) addSeriesFetched(n int) { if qs == nil { return } - atomic.AddInt64(&qs.SeriesFetched, int64(n)) + qs.SeriesFetched.Add(int64(n)) } func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) { @@ -188,7 +189,7 @@ func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) { return } d := time.Since(startTime).Milliseconds() - atomic.AddInt64(&qs.ExecutionTimeMsec, d) + qs.ExecutionTimeMsec.Add(d) } func (ec *EvalConfig) validate() { @@ -949,7 +950,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName return nil, err } - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 keepMetricNames := getKeepMetricNames(expr) tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID @@ -959,13 +960,13 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return values, timestamps @@ -976,8 +977,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName } putTimeseriesByWorkerID(tsw) - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load()) return tss, nil } @@ -1793,7 +1794,7 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs) defer qt.Done() - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) @@ -1805,12 +1806,12 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, for _, ts := range tsm.m { iafc.updateTimeseries(ts, workerID) } - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) continue } ts.Reset() samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) iafc.updateTimeseries(ts, workerID) // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. @@ -1823,8 +1824,8 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return nil, err } tss := iafc.finalizeTimeseries() - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal.Load()) return tss, nil } @@ -1833,7 +1834,7 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID seriesLen := rss.Len() @@ -1843,13 +1844,13 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return nil @@ -1863,8 +1864,8 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k } putTimeseriesByWorkerID(tsw) - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("samplesScanned=%d", samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("samplesScanned=%d", samplesScannedTotal.Load()) return tss, nil } diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go index 741ecde61..daecc0d38 100644 --- a/app/vmselect/promql/eval_test.go +++ b/app/vmselect/promql/eval_test.go @@ -86,14 +86,14 @@ func TestQueryStats_addSeriesFetched(t *testing.T) { } ec.QueryStats.addSeriesFetched(1) - if qs.SeriesFetched != 1 { - t.Fatalf("expected to get 1; got %d instead", qs.SeriesFetched) + if n := qs.SeriesFetched.Load(); n != 1 { + t.Fatalf("expected to get 1; got %d instead", n) } ecNew := copyEvalConfig(ec) ecNew.QueryStats.addSeriesFetched(3) - if qs.SeriesFetched != 4 { - t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched) + if n := qs.SeriesFetched.Load(); n != 4 { + t.Fatalf("expected to get 4; got %d instead", n) } } diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 3884e6b9f..81ae717b7 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -352,22 +352,19 @@ type parseCacheValue struct { } type parseCache struct { - // Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - - requests uint64 - misses uint64 + requests atomic.Uint64 + misses atomic.Uint64 m map[string]*parseCacheValue mu sync.RWMutex } func (pc *parseCache) Requests() uint64 { - return atomic.LoadUint64(&pc.requests) + return pc.requests.Load() } func (pc *parseCache) Misses() uint64 { - return atomic.LoadUint64(&pc.misses) + return pc.misses.Load() } func (pc *parseCache) Len() uint64 { @@ -378,14 +375,14 @@ func (pc *parseCache) Len() uint64 { } func (pc *parseCache) Get(q string) *parseCacheValue { - atomic.AddUint64(&pc.requests, 1) + pc.requests.Add(1) pc.mu.RLock() pcv := pc.m[q] pc.mu.RUnlock() if pcv == nil { - atomic.AddUint64(&pc.misses, 1) + pc.misses.Add(1) } return pcv } diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 428d9c42f..7687d13e1 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -45,7 +45,7 @@ func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) { rollupResultResetMetricRowSample.Store(&storage.MetricRow{}) go checkRollupResultCacheReset() }) - if atomic.LoadUint32(&needRollupResultCacheReset) != 0 { + if needRollupResultCacheReset.Load() { // The cache has been already instructed to reset. return } @@ -63,14 +63,14 @@ func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) { } if needCacheReset { // Do not call ResetRollupResultCache() here, since it may be heavy when frequently called. - atomic.StoreUint32(&needRollupResultCacheReset, 1) + needRollupResultCacheReset.Store(true) } } func checkRollupResultCacheReset() { for { time.Sleep(checkRollupResultCacheResetInterval) - if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 { + if needRollupResultCacheReset.Swap(false) { mr := rollupResultResetMetricRowSample.Load() d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds() logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs", @@ -82,7 +82,7 @@ func checkRollupResultCacheReset() { const checkRollupResultCacheResetInterval = 5 * time.Second -var needRollupResultCacheReset uint32 +var needRollupResultCacheReset atomic.Bool var checkRollupResultCacheResetOnce sync.Once var rollupResultResetMetricRowSample atomic.Pointer[storage.MetricRow] @@ -129,7 +129,7 @@ func InitRollupResultCache(cachePath string) { mustLoadRollupResultCacheKeyPrefix(rollupResultCachePath) } else { c = workingsetcache.New(cacheSize) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) } if *disableCache { c.Reset() @@ -211,7 +211,7 @@ var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="pr // ResetRollupResultCache resets rollup result cache. func ResetRollupResultCache() { rollupResultCacheResets.Inc() - atomic.AddUint64(&rollupResultCacheKeyPrefix, 1) + rollupResultCacheKeyPrefix.Add(1) logger.Infof("rollupResult cache has been cleared") } @@ -438,8 +438,8 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, } var key rollupResultCacheKey - key.prefix = rollupResultCacheKeyPrefix - key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1) + key.prefix = rollupResultCacheKeyPrefix.Load() + key.suffix = rollupResultCacheKeySuffix.Add(1) bb := bbPool.Get() bb.B = key.Marshal(bb.B[:0]) @@ -455,8 +455,12 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, } var ( - rollupResultCacheKeyPrefix uint64 - rollupResultCacheKeySuffix = uint64(time.Now().UnixNano()) + rollupResultCacheKeyPrefix atomic.Uint64 + rollupResultCacheKeySuffix = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x + }() ) func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) { @@ -517,26 +521,26 @@ func newRollupResultCacheKeyPrefix() uint64 { func mustLoadRollupResultCacheKeyPrefix(path string) { path = path + ".key.prefix" if !fs.IsPathExist(path) { - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } data, err := os.ReadFile(path) if err != nil { logger.Errorf("cannot load %s: %s; reset rollupResult cache", path, err) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } if len(data) != 8 { logger.Errorf("unexpected size of %s; want 8 bytes; got %d bytes; reset rollupResult cache", path, len(data)) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } - rollupResultCacheKeyPrefix = encoding.UnmarshalUint64(data) + rollupResultCacheKeyPrefix.Store(encoding.UnmarshalUint64(data)) } func mustSaveRollupResultCacheKeyPrefix(path string) { path = path + ".key.prefix" - data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix) + data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix.Load()) fs.MustWriteAtomic(path, data, true) } @@ -552,7 +556,7 @@ const ( func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { dst = append(dst, rollupResultCacheVersion) - dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeSeries) dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step) @@ -563,7 +567,7 @@ func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, windo func marshalRollupResultCacheKeyForInstantValues(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { dst = append(dst, rollupResultCacheVersion) - dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeInstantValues) dst = encoding.MarshalInt64(dst, window) dst = encoding.MarshalInt64(dst, step)