From 63d635a5e45e10a9246a0670a526bbddd8f03907 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@victoriametrics.com> Date: Sat, 24 Feb 2024 02:44:19 +0200 Subject: [PATCH] app: consistently use atomic.* types instead of atomic.* functions See ea9e2b19a5fa8aecc25c9da10fad8f4c1c58df38 --- app/vmagent/remotewrite/pendingseries.go | 7 ++-- app/vmagent/remotewrite/remotewrite.go | 16 ++++---- app/vmagent/remotewrite/statconn.go | 4 +- app/vmalert/remotewrite/client_test.go | 8 ++-- app/vmauth/auth_config.go | 29 +++++++------- app/vmctl/main.go | 8 ++-- .../netstorage/consistent_hash_timing_test.go | 4 +- app/vminsert/netstorage/netstorage.go | 33 +++++++++------- app/vmselect/graphite/transform.go | 10 ++--- app/vmselect/netstorage/netstorage.go | 12 +++--- app/vmselect/prometheus/prometheus.go | 12 +++--- .../prometheus/query_range_response.qtpl | 4 +- .../prometheus/query_range_response.qtpl.go | 4 +- app/vmselect/prometheus/query_response.qtpl | 4 +- .../prometheus/query_response.qtpl.go | 4 +- app/vmselect/promql/active_queries.go | 8 +++- app/vmselect/promql/eval.go | 39 ++++++++++--------- app/vmselect/promql/eval_test.go | 8 ++-- app/vmselect/promql/exec.go | 15 +++---- app/vmselect/promql/rollup_result_cache.go | 30 +++++++------- app/vmstorage/servers/vminsert.go | 6 +-- 21 files changed, 136 insertions(+), 129 deletions(-) diff --git a/app/vmagent/remotewrite/pendingseries.go b/app/vmagent/remotewrite/pendingseries.go index f2b3c8341b..2e31c8e3f3 100644 --- a/app/vmagent/remotewrite/pendingseries.go +++ b/app/vmagent/remotewrite/pendingseries.go @@ -82,7 +82,7 @@ func (ps *pendingSeries) periodicFlusher() { ps.mu.Unlock() return case <-ticker.C: - if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) { + if fasttime.UnixTimestamp()-ps.wr.lastFlushTime.Load() < uint64(flushSeconds) { continue } } @@ -93,8 +93,7 @@ func (ps *pendingSeries) periodicFlusher() { } type writeRequest struct { - // Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures. - lastFlushTime uint64 + lastFlushTime atomic.Uint64 // The queue to send blocks to. fq *persistentqueue.FastQueue @@ -155,7 +154,7 @@ func (wr *writeRequest) mustWriteBlock(block []byte) bool { func (wr *writeRequest) tryFlush() bool { wr.wr.Timeseries = wr.tss - atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp()) + wr.lastFlushTime.Store(fasttime.UnixTimestamp()) if !tryPushWriteRequest(&wr.wr, wr.fq.TryWriteBlock, wr.isVMRemoteWrite) { return false } diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 65ee2d730b..c568e2a36f 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -536,7 +536,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar // Push sharded data to remote storages in parallel in order to reduce // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup - var anyPushFailed uint64 + var anyPushFailed atomic.Bool for i, rwctx := range rwctxs { tssShard := tssByURL[i] if len(tssShard) == 0 { @@ -546,12 +546,12 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) { defer wg.Done() if !rwctx.TryPush(tss) { - atomic.StoreUint64(&anyPushFailed, 1) + anyPushFailed.Store(true) } }(rwctx, tssShard) } wg.Wait() - return atomic.LoadUint64(&anyPushFailed) == 0 + return !anyPushFailed.Load() } // Replicate data among rwctxs. @@ -559,17 +559,17 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar // the time needed for sending the data to multiple remote storage systems. var wg sync.WaitGroup wg.Add(len(rwctxs)) - var anyPushFailed uint64 + var anyPushFailed atomic.Bool for _, rwctx := range rwctxs { go func(rwctx *remoteWriteCtx) { defer wg.Done() if !rwctx.TryPush(tssBlock) { - atomic.StoreUint64(&anyPushFailed, 1) + anyPushFailed.Store(true) } }(rwctx) } wg.Wait() - return atomic.LoadUint64(&anyPushFailed) == 0 + return !anyPushFailed.Load() } // sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set. @@ -670,7 +670,7 @@ type remoteWriteCtx struct { streamAggrDropInput bool pss []*pendingSeries - pssNextIdx uint64 + pssNextIdx atomic.Uint64 rowsPushedAfterRelabel *metrics.Counter rowsDroppedByRelabel *metrics.Counter @@ -872,7 +872,7 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo } pss := rwctx.pss - idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss)) + idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss)) ok := pss[idx].TryPush(tss) diff --git a/app/vmagent/remotewrite/statconn.go b/app/vmagent/remotewrite/statconn.go index cb4d756ecc..ff28e75807 100644 --- a/app/vmagent/remotewrite/statconn.go +++ b/app/vmagent/remotewrite/statconn.go @@ -50,7 +50,7 @@ var ( ) type statConn struct { - closed uint64 + closed atomic.Int32 net.Conn } @@ -76,7 +76,7 @@ func (sc *statConn) Write(p []byte) (int, error) { func (sc *statConn) Close() error { err := sc.Conn.Close() - if atomic.AddUint64(&sc.closed, 1) == 1 { + if sc.closed.Add(1) == 1 { conns.Dec() } return err diff --git a/app/vmalert/remotewrite/client_test.go b/app/vmalert/remotewrite/client_test.go index 5a9461c24d..76ab3fc92d 100644 --- a/app/vmalert/remotewrite/client_test.go +++ b/app/vmalert/remotewrite/client_test.go @@ -91,14 +91,12 @@ func newRWServer() *rwServer { } type rwServer struct { - // WARN: ordering of fields is important for alignment! - // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG - acceptedRows uint64 + acceptedRows atomic.Uint64 *httptest.Server } func (rw *rwServer) accepted() int { - return int(atomic.LoadUint64(&rw.acceptedRows)) + return int(rw.acceptedRows.Load()) } func (rw *rwServer) err(w http.ResponseWriter, err error) { @@ -144,7 +142,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) { rw.err(w, fmt.Errorf("unmarhsal err: %w", err)) return } - atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries))) + rw.acceptedRows.Add(uint64(len(wr.Timeseries))) w.WriteHeader(http.StatusNoContent) } diff --git a/app/vmauth/auth_config.go b/app/vmauth/auth_config.go index 43ab7cb6fd..3236032b24 100644 --- a/app/vmauth/auth_config.go +++ b/app/vmauth/auth_config.go @@ -164,7 +164,7 @@ type Regex struct { // URLPrefix represents passed `url_prefix` type URLPrefix struct { - n uint32 + n atomic.Uint32 // the list of backend urls bus []*backendURL @@ -192,27 +192,28 @@ func (up *URLPrefix) setLoadBalancingPolicy(loadBalancingPolicy string) error { } type backendURL struct { - brokenDeadline uint64 - concurrentRequests int32 - url *url.URL + brokenDeadline atomic.Uint64 + concurrentRequests atomic.Int32 + + url *url.URL } func (bu *backendURL) isBroken() bool { ct := fasttime.UnixTimestamp() - return ct < atomic.LoadUint64(&bu.brokenDeadline) + return ct < bu.brokenDeadline.Load() } func (bu *backendURL) setBroken() { deadline := fasttime.UnixTimestamp() + uint64((*failTimeout).Seconds()) - atomic.StoreUint64(&bu.brokenDeadline, deadline) + bu.brokenDeadline.Store(deadline) } func (bu *backendURL) get() { - atomic.AddInt32(&bu.concurrentRequests, 1) + bu.concurrentRequests.Add(1) } func (bu *backendURL) put() { - atomic.AddInt32(&bu.concurrentRequests, -1) + bu.concurrentRequests.Add(-1) } func (up *URLPrefix) getBackendsCount() int { @@ -266,7 +267,7 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL { } // Slow path - select other backend urls. - n := atomic.AddUint32(&up.n, 1) + n := up.n.Add(1) for i := uint32(0); i < uint32(len(bus)); i++ { idx := (n + i) % uint32(len(bus)) @@ -274,22 +275,22 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL { if bu.isBroken() { continue } - if atomic.LoadInt32(&bu.concurrentRequests) == 0 { + if bu.concurrentRequests.Load() == 0 { // Fast path - return the backend with zero concurrently executed requests. - // Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores. - atomic.AddInt32(&bu.concurrentRequests, 1) + // Do not use CompareAndSwap() instead of Load(), since it is much slower on systems with many CPU cores. + bu.concurrentRequests.Add(1) return bu } } // Slow path - return the backend with the minimum number of concurrently executed requests. buMin := bus[n%uint32(len(bus))] - minRequests := atomic.LoadInt32(&buMin.concurrentRequests) + minRequests := buMin.concurrentRequests.Load() for _, bu := range bus { if bu.isBroken() { continue } - if n := atomic.LoadInt32(&bu.concurrentRequests); n < minRequests { + if n := bu.concurrentRequests.Load(); n < minRequests { buMin = bu minRequests = n } diff --git a/app/vmctl/main.go b/app/vmctl/main.go index 1071389b25..94b41a5198 100644 --- a/app/vmctl/main.go +++ b/app/vmctl/main.go @@ -330,14 +330,14 @@ func main() { if err != nil { return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1) } - var blocksCount uint64 + var blocksCount atomic.Uint64 if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error { - atomic.AddUint64(&blocksCount, 1) + blocksCount.Add(1) return nil }); err != nil { - return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount, err), 1) + return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1) } - log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount) + log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load()) return nil }, }, diff --git a/app/vminsert/netstorage/consistent_hash_timing_test.go b/app/vminsert/netstorage/consistent_hash_timing_test.go index 16406ceb61..b16929dc86 100644 --- a/app/vminsert/netstorage/consistent_hash_timing_test.go +++ b/app/vminsert/netstorage/consistent_hash_timing_test.go @@ -24,7 +24,7 @@ func BenchmarkConsistentHash(b *testing.B) { sum += idx } } - atomic.AddUint64(&BenchSink, uint64(sum)) + BenchSink.Add(uint64(sum)) }) } @@ -37,4 +37,4 @@ var benchKeys = func() []uint64 { return keys }() -var BenchSink uint64 +var BenchSink atomic.Uint64 diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index f6e44cc590..49a5b6a337 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -48,7 +48,7 @@ var ( var errStorageReadOnly = errors.New("storage node is read only") func (sn *storageNode) isReady() bool { - return atomic.LoadUint32(&sn.broken) == 0 && atomic.LoadUint32(&sn.isReadOnly) == 0 + return !sn.broken.Load() && !sn.isReadOnly.Load() } // push pushes buf to sn internal bufs. @@ -70,7 +70,7 @@ func (sn *storageNode) push(snb *storageNodesBucket, buf []byte, rows int) error // Fast path - the buffer is successfully sent to sn. return nil } - if *dropSamplesOnOverload && atomic.LoadUint32(&sn.isReadOnly) == 0 { + if *dropSamplesOnOverload && !sn.isReadOnly.Load() { sn.rowsDroppedOnOverload.Add(rows) dropSamplesOnOverloadLogger.Warnf("some rows dropped, because -dropSamplesOnOverload is set and vmstorage %s cannot accept new rows now. "+ "See vm_rpc_rows_dropped_on_overload_total metric at /metrics page", sn.dialer.Addr()) @@ -274,7 +274,7 @@ func (sn *storageNode) checkHealth() { } bc, err := sn.dial() if err != nil { - atomic.StoreUint32(&sn.broken, 1) + sn.broken.Store(true) sn.brCond.Broadcast() if sn.lastDialErr == nil { // Log the error only once. @@ -286,7 +286,7 @@ func (sn *storageNode) checkHealth() { logger.Infof("successfully dialed -storageNode=%q", sn.dialer.Addr()) sn.lastDialErr = nil sn.bc = bc - atomic.StoreUint32(&sn.broken, 0) + sn.broken.Store(false) sn.brCond.Broadcast() } @@ -314,7 +314,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { } if errors.Is(err, errStorageReadOnly) { // The vmstorage is transitioned to readonly mode. - atomic.StoreUint32(&sn.isReadOnly, 1) + sn.isReadOnly.Store(true) sn.brCond.Broadcast() // Signal the caller that the data wasn't accepted by the vmstorage, // so it will be re-routed to the remaining vmstorage nodes. @@ -327,7 +327,7 @@ func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) } sn.bc = nil - atomic.StoreUint32(&sn.broken, 1) + sn.broken.Store(true) sn.brCond.Broadcast() sn.connectionErrors.Inc() return false @@ -413,13 +413,13 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) { // storageNode is a client sending data to vmstorage node. type storageNode struct { - // broken is set to non-zero if the given vmstorage node is temporarily unhealthy. + // broken is set to true if the given vmstorage node is temporarily unhealthy. // In this case the data is re-routed to the remaining healthy vmstorage nodes. - broken uint32 + broken atomic.Bool - // isReadOnly is set to non-zero if the given vmstorage node is read only + // isReadOnly is set to true if the given vmstorage node is read only // In this case the data is re-routed to the remaining healthy vmstorage nodes. - isReadOnly uint32 + isReadOnly atomic.Bool // brLock protects br. brLock sync.Mutex @@ -561,13 +561,16 @@ func initStorageNodes(addrs []string, hashSeed uint64) *storageNodesBucket { return float64(n) }) _ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_reachable{name="vminsert", addr=%q}`, addr), func() float64 { - if atomic.LoadUint32(&sn.broken) != 0 { + if sn.broken.Load() { return 0 } return 1 }) _ = ms.NewGauge(fmt.Sprintf(`vm_rpc_vmstorage_is_read_only{name="vminsert", addr=%q}`, addr), func() float64 { - return float64(atomic.LoadUint32(&sn.isReadOnly)) + if sn.isReadOnly.Load() { + return 1 + } + return 0 }) sns = append(sns, sn) } @@ -810,7 +813,7 @@ func (sn *storageNode) readOnlyChecker() { } func (sn *storageNode) checkReadOnlyMode() { - if atomic.LoadUint32(&sn.isReadOnly) == 0 { + if !sn.isReadOnly.Load() { // fast path - the sn isn't in readonly mode return } @@ -824,7 +827,7 @@ func (sn *storageNode) checkReadOnlyMode() { err := sendToConn(sn.bc, nil) if err == nil { // The storage switched from readonly to non-readonly mode - atomic.StoreUint32(&sn.isReadOnly, 0) + sn.isReadOnly.Store(false) return } if errors.Is(err, errStorageReadOnly) { @@ -840,7 +843,7 @@ func (sn *storageNode) checkReadOnlyMode() { cannotCloseStorageNodeConnLogger.Warnf("cannot close connection to storageNode %q: %s", sn.dialer.Addr(), err) } sn.bc = nil - atomic.StoreUint32(&sn.broken, 1) + sn.broken.Store(true) sn.brCond.Broadcast() sn.connectionErrors.Inc() } diff --git a/app/vmselect/graphite/transform.go b/app/vmselect/graphite/transform.go index afeeec27a3..f3d96328ae 100644 --- a/app/vmselect/graphite/transform.go +++ b/app/vmselect/graphite/transform.go @@ -3842,18 +3842,18 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (* errCh <- err close(errCh) }() - var skipProcessing uint32 + var skipProcessing atomic.Bool for i := 0; i < goroutines; i++ { go func() { defer wg.Done() for s := range seriesCh { - if atomic.LoadUint32(&skipProcessing) != 0 { + if skipProcessing.Load() { continue } sNew, err := f(s) if err != nil { // Drain the rest of series and do not call f for them in order to conserve CPU time. - atomic.StoreUint32(&skipProcessing, 1) + skipProcessing.Store(true) resultCh <- &result{ err: err, } @@ -5609,9 +5609,9 @@ func (nsf *nextSeriesFunc) peekStep(step int64) (int64, error) { if s != nil { step = s.step } - calls := uint64(0) + var calls atomic.Uint64 *nsf = func() (*series, error) { - if atomic.AddUint64(&calls, 1) == 1 { + if calls.Add(1) == 1 { return s, nil } return nextSeries() diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index a380301757..7ad1710cc7 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -107,7 +107,7 @@ func closeTmpBlockFiles(tbfs []*tmpBlocksFile) { } type timeseriesWork struct { - mustStop *uint32 + mustStop *atomic.Bool rss *Results pts *packedTimeseries f func(rs *Result, workerID uint) error @@ -117,22 +117,22 @@ type timeseriesWork struct { } func (tsw *timeseriesWork) do(r *Result, workerID uint) error { - if atomic.LoadUint32(tsw.mustStop) != 0 { + if tsw.mustStop.Load() { return nil } rss := tsw.rss if rss.deadline.Exceeded() { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String()) } if err := tsw.pts.Unpack(r, rss.tbfs, rss.tr); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return fmt.Errorf("error during time series unpacking: %w", err) } tsw.rowsProcessed = len(r.Timestamps) if len(r.Timestamps) > 0 { if err := tsw.f(r, workerID); err != nil { - atomic.StoreUint32(tsw.mustStop, 1) + tsw.mustStop.Store(true) return err } } @@ -264,7 +264,7 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke return 0, nil } - var mustStop uint32 + var mustStop atomic.Bool initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) { tsw.rss = rss tsw.pts = pts diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 320967165c..9728794dfd 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -331,21 +331,21 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter } } else if format == "promapi" { WriteExportPromAPIHeader(bw) - firstLineOnce := uint32(0) - firstLineSent := uint32(0) + var firstLineOnce atomic.Bool + var firstLineSent atomic.Bool writeLineFunc = func(xb *exportBlock, workerID uint) error { bb := sw.getBuffer(workerID) - // Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization + // Use Load() in front of CompareAndSwap() in order to avoid slow inter-CPU synchronization // in fast path after the first line has been already sent. - if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { + if !firstLineOnce.Load() && firstLineOnce.CompareAndSwap(false, true) { // Send the first line to sw.bw WriteExportPromAPILine(bb, xb) _, err := sw.bw.Write(bb.B) bb.Reset() - atomic.StoreUint32(&firstLineSent, 1) + firstLineSent.Store(true) return err } - for atomic.LoadUint32(&firstLineSent) == 0 { + for !firstLineSent.Load() { // Busy wait until the first line is sent to sw.bw runtime.Gosched() } diff --git a/app/vmselect/prometheus/query_range_response.qtpl b/app/vmselect/prometheus/query_range_response.qtpl index 63e5de1611..cc4c6e5e57 100644 --- a/app/vmselect/prometheus/query_range_response.qtpl +++ b/app/vmselect/prometheus/query_range_response.qtpl @@ -34,8 +34,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries // seriesFetched is string instead of int because of historical reasons. // It cannot be converted to int without breaking backwards compatibility at vmalert :( %} - "seriesFetched": "{%dl qs.SeriesFetched %}", - "executionTimeMsec": {%dl qs.ExecutionTimeMsec %} + "seriesFetched": "{%dl qs.SeriesFetched.Load() %}", + "executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %} } {% code qt.Printf("generate /api/v1/query_range response for series=%d, points=%d", seriesCount, pointsCount) diff --git a/app/vmselect/prometheus/query_range_response.qtpl.go b/app/vmselect/prometheus/query_range_response.qtpl.go index 9a3358bae5..4f13503e5f 100644 --- a/app/vmselect/prometheus/query_range_response.qtpl.go +++ b/app/vmselect/prometheus/query_range_response.qtpl.go @@ -80,11 +80,11 @@ func StreamQueryRangeResponse(qw422016 *qt422016.Writer, isPartial bool, rs []ne //line app/vmselect/prometheus/query_range_response.qtpl:36 qw422016.N().S(`"seriesFetched": "`) //line app/vmselect/prometheus/query_range_response.qtpl:37 - qw422016.N().DL(qs.SeriesFetched) + qw422016.N().DL(qs.SeriesFetched.Load()) //line app/vmselect/prometheus/query_range_response.qtpl:37 qw422016.N().S(`","executionTimeMsec":`) //line app/vmselect/prometheus/query_range_response.qtpl:38 - qw422016.N().DL(qs.ExecutionTimeMsec) + qw422016.N().DL(qs.ExecutionTimeMsec.Load()) //line app/vmselect/prometheus/query_range_response.qtpl:38 qw422016.N().S(`}`) //line app/vmselect/prometheus/query_range_response.qtpl:41 diff --git a/app/vmselect/prometheus/query_response.qtpl b/app/vmselect/prometheus/query_response.qtpl index f0f080c236..1c066295d6 100644 --- a/app/vmselect/prometheus/query_response.qtpl +++ b/app/vmselect/prometheus/query_response.qtpl @@ -36,8 +36,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries // seriesFetched is string instead of int because of historical reasons. // It cannot be converted to int without breaking backwards compatibility at vmalert :( %} - "seriesFetched": "{%dl qs.SeriesFetched %}", - "executionTimeMsec": {%dl qs.ExecutionTimeMsec %} + "seriesFetched": "{%dl qs.SeriesFetched.Load() %}", + "executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %} } {% code qt.Printf("generate /api/v1/query response for series=%d", seriesCount) diff --git a/app/vmselect/prometheus/query_response.qtpl.go b/app/vmselect/prometheus/query_response.qtpl.go index d083b70907..8dd3d932ed 100644 --- a/app/vmselect/prometheus/query_response.qtpl.go +++ b/app/vmselect/prometheus/query_response.qtpl.go @@ -90,11 +90,11 @@ func StreamQueryResponse(qw422016 *qt422016.Writer, isPartial bool, rs []netstor //line app/vmselect/prometheus/query_response.qtpl:38 qw422016.N().S(`"seriesFetched": "`) //line app/vmselect/prometheus/query_response.qtpl:39 - qw422016.N().DL(qs.SeriesFetched) + qw422016.N().DL(qs.SeriesFetched.Load()) //line app/vmselect/prometheus/query_response.qtpl:39 qw422016.N().S(`","executionTimeMsec":`) //line app/vmselect/prometheus/query_response.qtpl:40 - qw422016.N().DL(qs.ExecutionTimeMsec) + qw422016.N().DL(qs.ExecutionTimeMsec.Load()) //line app/vmselect/prometheus/query_response.qtpl:40 qw422016.N().S(`}`) //line app/vmselect/prometheus/query_response.qtpl:43 diff --git a/app/vmselect/promql/active_queries.go b/app/vmselect/promql/active_queries.go index 4b5c435456..66f9415f42 100644 --- a/app/vmselect/promql/active_queries.go +++ b/app/vmselect/promql/active_queries.go @@ -82,7 +82,7 @@ func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 { aqe.start = ec.Start aqe.end = ec.End aqe.step = ec.Step - aqe.qid = atomic.AddUint64(&nextActiveQueryID, 1) + aqe.qid = nextActiveQueryID.Add(1) aqe.quotedRemoteAddr = ec.QuotedRemoteAddr aqe.q = q aqe.startTime = time.Now() @@ -109,4 +109,8 @@ func (aq *activeQueries) GetAll() []activeQueryEntry { return aqes } -var nextActiveQueryID = uint64(time.Now().UnixNano()) +var nextActiveQueryID = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x +}() diff --git a/app/vmselect/promql/eval.go b/app/vmselect/promql/eval.go index d1b0ba5f90..b3c71c96f0 100644 --- a/app/vmselect/promql/eval.go +++ b/app/vmselect/promql/eval.go @@ -182,16 +182,17 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig { // QueryStats contains various stats for the query. type QueryStats struct { // SeriesFetched contains the number of series fetched from storage during the query evaluation. - SeriesFetched int64 + SeriesFetched atomic.Int64 + // ExecutionTimeMsec contains the number of milliseconds the query took to execute. - ExecutionTimeMsec int64 + ExecutionTimeMsec atomic.Int64 } func (qs *QueryStats) addSeriesFetched(n int) { if qs == nil { return } - atomic.AddInt64(&qs.SeriesFetched, int64(n)) + qs.SeriesFetched.Add(int64(n)) } func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) { @@ -199,7 +200,7 @@ func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) { return } d := time.Since(startTime).Milliseconds() - atomic.AddInt64(&qs.ExecutionTimeMsec, d) + qs.ExecutionTimeMsec.Add(d) } func (ec *EvalConfig) updateIsPartialResponse(isPartialResponse bool) { @@ -966,7 +967,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName return nil, err } - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 keepMetricNames := getKeepMetricNames(expr) tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID @@ -976,13 +977,13 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return values, timestamps @@ -993,8 +994,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName } putTimeseriesByWorkerID(tsw) - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load()) return tss, nil } @@ -1821,7 +1822,7 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) { qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs) defer qt.Done() - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error { rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps) preFunc(rs.Values, rs.Timestamps) @@ -1833,12 +1834,12 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, for _, ts := range tsm.m { iafc.updateTimeseries(ts, workerID) } - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) continue } ts.Reset() samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) iafc.updateTimeseries(ts, workerID) // ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used. @@ -1851,8 +1852,8 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string, return nil, err } tss := iafc.finalizeTimeseries() - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal.Load()) return tss, nil } @@ -1861,7 +1862,7 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs) defer qt.Done() - var samplesScannedTotal uint64 + var samplesScannedTotal atomic.Uint64 tsw := getTimeseriesByWorkerID() seriesByWorkerID := tsw.byWorkerID seriesLen := rss.Len() @@ -1871,13 +1872,13 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k for _, rc := range rcs { if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil { samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss) continue } var ts timeseries samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps) - atomic.AddUint64(&samplesScannedTotal, samplesScanned) + samplesScannedTotal.Add(samplesScanned) seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts) } return nil @@ -1891,8 +1892,8 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k } putTimeseriesByWorkerID(tsw) - rowsScannedPerQuery.Update(float64(samplesScannedTotal)) - qt.Printf("samplesScanned=%d", samplesScannedTotal) + rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load())) + qt.Printf("samplesScanned=%d", samplesScannedTotal.Load()) return tss, nil } diff --git a/app/vmselect/promql/eval_test.go b/app/vmselect/promql/eval_test.go index 741ecde618..daecc0d385 100644 --- a/app/vmselect/promql/eval_test.go +++ b/app/vmselect/promql/eval_test.go @@ -86,14 +86,14 @@ func TestQueryStats_addSeriesFetched(t *testing.T) { } ec.QueryStats.addSeriesFetched(1) - if qs.SeriesFetched != 1 { - t.Fatalf("expected to get 1; got %d instead", qs.SeriesFetched) + if n := qs.SeriesFetched.Load(); n != 1 { + t.Fatalf("expected to get 1; got %d instead", n) } ecNew := copyEvalConfig(ec) ecNew.QueryStats.addSeriesFetched(3) - if qs.SeriesFetched != 4 { - t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched) + if n := qs.SeriesFetched.Load(); n != 4 { + t.Fatalf("expected to get 4; got %d instead", n) } } diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index 6798f85460..9a5149fbda 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -353,22 +353,19 @@ type parseCacheValue struct { } type parseCache struct { - // Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 - - requests uint64 - misses uint64 + requests atomic.Uint64 + misses atomic.Uint64 m map[string]*parseCacheValue mu sync.RWMutex } func (pc *parseCache) Requests() uint64 { - return atomic.LoadUint64(&pc.requests) + return pc.requests.Load() } func (pc *parseCache) Misses() uint64 { - return atomic.LoadUint64(&pc.misses) + return pc.misses.Load() } func (pc *parseCache) Len() uint64 { @@ -379,14 +376,14 @@ func (pc *parseCache) Len() uint64 { } func (pc *parseCache) Get(q string) *parseCacheValue { - atomic.AddUint64(&pc.requests, 1) + pc.requests.Add(1) pc.mu.RLock() pcv := pc.m[q] pc.mu.RUnlock() if pcv == nil { - atomic.AddUint64(&pc.misses, 1) + pc.misses.Add(1) } return pcv } diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 146a96e885..961a39fd1b 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -76,7 +76,7 @@ func InitRollupResultCache(cachePath string) { mustLoadRollupResultCacheKeyPrefix(rollupResultCachePath) } else { c = workingsetcache.New(cacheSize) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) } if *disableCache { c.Reset() @@ -160,7 +160,7 @@ var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="pr // ResetRollupResultCache resets rollup result cache. func ResetRollupResultCache() { rollupResultCacheResets.Inc() - atomic.AddUint64(&rollupResultCacheKeyPrefix, 1) + rollupResultCacheKeyPrefix.Add(1) logger.Infof("rollupResult cache has been cleared") } @@ -387,8 +387,8 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, } var key rollupResultCacheKey - key.prefix = rollupResultCacheKeyPrefix - key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1) + key.prefix = rollupResultCacheKeyPrefix.Load() + key.suffix = rollupResultCacheKeySuffix.Add(1) bb := bbPool.Get() bb.B = key.Marshal(bb.B[:0]) @@ -404,8 +404,12 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig, } var ( - rollupResultCacheKeyPrefix uint64 - rollupResultCacheKeySuffix = uint64(time.Now().UnixNano()) + rollupResultCacheKeyPrefix atomic.Uint64 + rollupResultCacheKeySuffix = func() *atomic.Uint64 { + var x atomic.Uint64 + x.Store(uint64(time.Now().UnixNano())) + return &x + }() ) func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) { @@ -466,26 +470,26 @@ func newRollupResultCacheKeyPrefix() uint64 { func mustLoadRollupResultCacheKeyPrefix(path string) { path = path + ".key.prefix" if !fs.IsPathExist(path) { - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } data, err := os.ReadFile(path) if err != nil { logger.Errorf("cannot load %s: %s; reset rollupResult cache", path, err) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } if len(data) != 8 { logger.Errorf("unexpected size of %s; want 8 bytes; got %d bytes; reset rollupResult cache", path, len(data)) - rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix() + rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix()) return } - rollupResultCacheKeyPrefix = encoding.UnmarshalUint64(data) + rollupResultCacheKeyPrefix.Store(encoding.UnmarshalUint64(data)) } func mustSaveRollupResultCacheKeyPrefix(path string) { path = path + ".key.prefix" - data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix) + data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix.Load()) fs.MustWriteAtomic(path, data, true) } @@ -501,7 +505,7 @@ const ( func marshalRollupResultCacheKeyForSeries(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { dst = append(dst, rollupResultCacheVersion) - dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeSeries) dst = encoding.MarshalUint32(dst, at.AccountID) dst = encoding.MarshalUint32(dst, at.ProjectID) @@ -514,7 +518,7 @@ func marshalRollupResultCacheKeyForSeries(dst []byte, at *auth.Token, expr metri func marshalRollupResultCacheKeyForInstantValues(dst []byte, at *auth.Token, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte { dst = append(dst, rollupResultCacheVersion) - dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix) + dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load()) dst = append(dst, rollupResultCacheTypeInstantValues) dst = encoding.MarshalUint32(dst, at.AccountID) dst = encoding.MarshalUint32(dst, at.ProjectID) diff --git a/app/vmstorage/servers/vminsert.go b/app/vmstorage/servers/vminsert.go index 792af40e8b..93f19c0d6e 100644 --- a/app/vmstorage/servers/vminsert.go +++ b/app/vmstorage/servers/vminsert.go @@ -45,7 +45,7 @@ type VMInsertServer struct { wg sync.WaitGroup // stopFlag is set to true when the server needs to stop. - stopFlag uint32 + stopFlag atomic.Bool } // NewVMInsertServer starts VMInsertServer at the given addr serving the given storage. @@ -161,9 +161,9 @@ func (s *VMInsertServer) MustStop() { } func (s *VMInsertServer) setIsStopping() { - atomic.StoreUint32(&s.stopFlag, 1) + s.stopFlag.Store(true) } func (s *VMInsertServer) isStopping() bool { - return atomic.LoadUint32(&s.stopFlag) != 0 + return s.stopFlag.Load() }