app: consistently use atomic.* types instead of atomic.* functions

See ea9e2b19a5
This commit is contained in:
Aliaksandr Valialkin 2024-02-24 02:44:19 +02:00
parent 7e1dd8ab9d
commit 6697da73e5
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
18 changed files with 120 additions and 116 deletions

View file

@ -82,7 +82,7 @@ func (ps *pendingSeries) periodicFlusher() {
ps.mu.Unlock()
return
case <-ticker.C:
if fasttime.UnixTimestamp()-atomic.LoadUint64(&ps.wr.lastFlushTime) < uint64(flushSeconds) {
if fasttime.UnixTimestamp()-ps.wr.lastFlushTime.Load() < uint64(flushSeconds) {
continue
}
}
@ -93,8 +93,7 @@ func (ps *pendingSeries) periodicFlusher() {
}
type writeRequest struct {
// Move lastFlushTime to the top of the struct in order to guarantee atomic access on 32-bit architectures.
lastFlushTime uint64
lastFlushTime atomic.Uint64
// The queue to send blocks to.
fq *persistentqueue.FastQueue
@ -155,7 +154,7 @@ func (wr *writeRequest) mustWriteBlock(block []byte) bool {
func (wr *writeRequest) tryFlush() bool {
wr.wr.Timeseries = wr.tss
atomic.StoreUint64(&wr.lastFlushTime, fasttime.UnixTimestamp())
wr.lastFlushTime.Store(fasttime.UnixTimestamp())
if !tryPushWriteRequest(&wr.wr, wr.fq.TryWriteBlock, wr.isVMRemoteWrite) {
return false
}

View file

@ -536,7 +536,7 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
// Push sharded data to remote storages in parallel in order to reduce
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
var anyPushFailed uint64
var anyPushFailed atomic.Bool
for i, rwctx := range rwctxs {
tssShard := tssByURL[i]
if len(tssShard) == 0 {
@ -546,12 +546,12 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
go func(rwctx *remoteWriteCtx, tss []prompbmarshal.TimeSeries) {
defer wg.Done()
if !rwctx.TryPush(tss) {
atomic.StoreUint64(&anyPushFailed, 1)
anyPushFailed.Store(true)
}
}(rwctx, tssShard)
}
wg.Wait()
return atomic.LoadUint64(&anyPushFailed) == 0
return !anyPushFailed.Load()
}
// Replicate data among rwctxs.
@ -559,17 +559,17 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
// the time needed for sending the data to multiple remote storage systems.
var wg sync.WaitGroup
wg.Add(len(rwctxs))
var anyPushFailed uint64
var anyPushFailed atomic.Bool
for _, rwctx := range rwctxs {
go func(rwctx *remoteWriteCtx) {
defer wg.Done()
if !rwctx.TryPush(tssBlock) {
atomic.StoreUint64(&anyPushFailed, 1)
anyPushFailed.Store(true)
}
}(rwctx)
}
wg.Wait()
return atomic.LoadUint64(&anyPushFailed) == 0
return !anyPushFailed.Load()
}
// sortLabelsIfNeeded sorts labels if -sortLabels command-line flag is set.
@ -670,7 +670,7 @@ type remoteWriteCtx struct {
streamAggrDropInput bool
pss []*pendingSeries
pssNextIdx uint64
pssNextIdx atomic.Uint64
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
@ -872,7 +872,7 @@ func (rwctx *remoteWriteCtx) tryPushInternal(tss []prompbmarshal.TimeSeries) boo
}
pss := rwctx.pss
idx := atomic.AddUint64(&rwctx.pssNextIdx, 1) % uint64(len(pss))
idx := rwctx.pssNextIdx.Add(1) % uint64(len(pss))
ok := pss[idx].TryPush(tss)

View file

@ -50,7 +50,7 @@ var (
)
type statConn struct {
closed uint64
closed atomic.Int32
net.Conn
}
@ -76,7 +76,7 @@ func (sc *statConn) Write(p []byte) (int, error) {
func (sc *statConn) Close() error {
err := sc.Conn.Close()
if atomic.AddUint64(&sc.closed, 1) == 1 {
if sc.closed.Add(1) == 1 {
conns.Dec()
}
return err

View file

@ -91,14 +91,12 @@ func newRWServer() *rwServer {
}
type rwServer struct {
// WARN: ordering of fields is important for alignment!
// see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
acceptedRows uint64
acceptedRows atomic.Uint64
*httptest.Server
}
func (rw *rwServer) accepted() int {
return int(atomic.LoadUint64(&rw.acceptedRows))
return int(rw.acceptedRows.Load())
}
func (rw *rwServer) err(w http.ResponseWriter, err error) {
@ -144,7 +142,7 @@ func (rw *rwServer) handler(w http.ResponseWriter, r *http.Request) {
rw.err(w, fmt.Errorf("unmarhsal err: %w", err))
return
}
atomic.AddUint64(&rw.acceptedRows, uint64(len(wr.Timeseries)))
rw.acceptedRows.Add(uint64(len(wr.Timeseries)))
w.WriteHeader(http.StatusNoContent)
}

View file

@ -164,7 +164,7 @@ type Regex struct {
// URLPrefix represents passed `url_prefix`
type URLPrefix struct {
n uint32
n atomic.Uint32
// the list of backend urls
bus []*backendURL
@ -192,27 +192,28 @@ func (up *URLPrefix) setLoadBalancingPolicy(loadBalancingPolicy string) error {
}
type backendURL struct {
brokenDeadline uint64
concurrentRequests int32
url *url.URL
brokenDeadline atomic.Uint64
concurrentRequests atomic.Int32
url *url.URL
}
func (bu *backendURL) isBroken() bool {
ct := fasttime.UnixTimestamp()
return ct < atomic.LoadUint64(&bu.brokenDeadline)
return ct < bu.brokenDeadline.Load()
}
func (bu *backendURL) setBroken() {
deadline := fasttime.UnixTimestamp() + uint64((*failTimeout).Seconds())
atomic.StoreUint64(&bu.brokenDeadline, deadline)
bu.brokenDeadline.Store(deadline)
}
func (bu *backendURL) get() {
atomic.AddInt32(&bu.concurrentRequests, 1)
bu.concurrentRequests.Add(1)
}
func (bu *backendURL) put() {
atomic.AddInt32(&bu.concurrentRequests, -1)
bu.concurrentRequests.Add(-1)
}
func (up *URLPrefix) getBackendsCount() int {
@ -266,7 +267,7 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL {
}
// Slow path - select other backend urls.
n := atomic.AddUint32(&up.n, 1)
n := up.n.Add(1)
for i := uint32(0); i < uint32(len(bus)); i++ {
idx := (n + i) % uint32(len(bus))
@ -274,22 +275,22 @@ func (up *URLPrefix) getLeastLoadedBackendURL() *backendURL {
if bu.isBroken() {
continue
}
if atomic.LoadInt32(&bu.concurrentRequests) == 0 {
if bu.concurrentRequests.Load() == 0 {
// Fast path - return the backend with zero concurrently executed requests.
// Do not use atomic.CompareAndSwapInt32(), since it is much slower on systems with many CPU cores.
atomic.AddInt32(&bu.concurrentRequests, 1)
// Do not use CompareAndSwap() instead of Load(), since it is much slower on systems with many CPU cores.
bu.concurrentRequests.Add(1)
return bu
}
}
// Slow path - return the backend with the minimum number of concurrently executed requests.
buMin := bus[n%uint32(len(bus))]
minRequests := atomic.LoadInt32(&buMin.concurrentRequests)
minRequests := buMin.concurrentRequests.Load()
for _, bu := range bus {
if bu.isBroken() {
continue
}
if n := atomic.LoadInt32(&bu.concurrentRequests); n < minRequests {
if n := bu.concurrentRequests.Load(); n < minRequests {
buMin = bu
minRequests = n
}

View file

@ -330,14 +330,14 @@ func main() {
if err != nil {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
}
var blocksCount uint64
var blocksCount atomic.Uint64
if err := stream.Parse(f, isBlockGzipped, func(block *stream.Block) error {
atomic.AddUint64(&blocksCount, 1)
blocksCount.Add(1)
return nil
}); err != nil {
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount, err), 1)
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount.Load(), err), 1)
}
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount)
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount.Load())
return nil
},
},

View file

@ -3842,18 +3842,18 @@ func nextSeriesConcurrentWrapper(nextSeries nextSeriesFunc, f func(s *series) (*
errCh <- err
close(errCh)
}()
var skipProcessing uint32
var skipProcessing atomic.Bool
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for s := range seriesCh {
if atomic.LoadUint32(&skipProcessing) != 0 {
if skipProcessing.Load() {
continue
}
sNew, err := f(s)
if err != nil {
// Drain the rest of series and do not call f for them in order to conserve CPU time.
atomic.StoreUint32(&skipProcessing, 1)
skipProcessing.Store(true)
resultCh <- &result{
err: err,
}
@ -5609,9 +5609,9 @@ func (nsf *nextSeriesFunc) peekStep(step int64) (int64, error) {
if s != nil {
step = s.step
}
calls := uint64(0)
var calls atomic.Uint64
*nsf = func() (*series, error) {
if atomic.AddUint64(&calls, 1) == 1 {
if calls.Add(1) == 1 {
return s, nil
}
return nextSeries()

View file

@ -84,7 +84,7 @@ func (rss *Results) mustClose() {
}
type timeseriesWork struct {
mustStop *uint32
mustStop *atomic.Bool
rss *Results
pts *packedTimeseries
f func(rs *Result, workerID uint) error
@ -94,22 +94,22 @@ type timeseriesWork struct {
}
func (tsw *timeseriesWork) do(r *Result, workerID uint) error {
if atomic.LoadUint32(tsw.mustStop) != 0 {
if tsw.mustStop.Load() {
return nil
}
rss := tsw.rss
if rss.deadline.Exceeded() {
atomic.StoreUint32(tsw.mustStop, 1)
tsw.mustStop.Store(true)
return fmt.Errorf("timeout exceeded during query execution: %s", rss.deadline.String())
}
if err := tsw.pts.Unpack(r, rss.tbf, rss.tr); err != nil {
atomic.StoreUint32(tsw.mustStop, 1)
tsw.mustStop.Store(true)
return fmt.Errorf("error during time series unpacking: %w", err)
}
tsw.rowsProcessed = len(r.Timestamps)
if len(r.Timestamps) > 0 {
if err := tsw.f(r, workerID); err != nil {
atomic.StoreUint32(tsw.mustStop, 1)
tsw.mustStop.Store(true)
return err
}
}
@ -241,7 +241,7 @@ func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, worke
return 0, nil
}
var mustStop uint32
var mustStop atomic.Bool
initTimeseriesWork := func(tsw *timeseriesWork, pts *packedTimeseries) {
tsw.rss = rss
tsw.pts = pts
@ -1011,7 +1011,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
var (
errGlobal error
errGlobalLock sync.Mutex
mustStop uint32
mustStop atomic.Bool
)
var wg sync.WaitGroup
wg.Add(gomaxprocs)
@ -1023,7 +1023,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
errGlobalLock.Lock()
if errGlobal == nil {
errGlobal = err
atomic.StoreUint32(&mustStop, 1)
mustStop.Store(true)
}
errGlobalLock.Unlock()
}
@ -1041,7 +1041,7 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
if deadline.Exceeded() {
return fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
}
if atomic.LoadUint32(&mustStop) != 0 {
if mustStop.Load() {
break
}
xw := exportWorkPool.Get().(*exportWork)

View file

@ -320,21 +320,21 @@ func exportHandler(qt *querytracer.Tracer, w http.ResponseWriter, cp *commonPara
}
} else if format == "promapi" {
WriteExportPromAPIHeader(bw)
firstLineOnce := uint32(0)
firstLineSent := uint32(0)
var firstLineOnce atomic.Bool
var firstLineSent atomic.Bool
writeLineFunc = func(xb *exportBlock, workerID uint) error {
bb := sw.getBuffer(workerID)
// Use atomic.LoadUint32() in front of atomic.CompareAndSwapUint32() in order to avoid slow inter-CPU synchronization
// Use Load() in front of CompareAndSwap() in order to avoid slow inter-CPU synchronization
// in fast path after the first line has been already sent.
if atomic.LoadUint32(&firstLineOnce) == 0 && atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
if !firstLineOnce.Load() && firstLineOnce.CompareAndSwap(false, true) {
// Send the first line to sw.bw
WriteExportPromAPILine(bb, xb)
_, err := sw.bw.Write(bb.B)
bb.Reset()
atomic.StoreUint32(&firstLineSent, 1)
firstLineSent.Store(true)
return err
}
for atomic.LoadUint32(&firstLineSent) == 0 {
for !firstLineSent.Load() {
// Busy wait until the first line is sent to sw.bw
runtime.Gosched()
}

View file

@ -33,8 +33,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#range-queries
// seriesFetched is string instead of int because of historical reasons.
// It cannot be converted to int without breaking backwards compatibility at vmalert :(
%}
"seriesFetched": "{%dl qs.SeriesFetched %}",
"executionTimeMsec": {%dl qs.ExecutionTimeMsec %}
"seriesFetched": "{%dl qs.SeriesFetched.Load() %}",
"executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %}
}
{% code
qt.Printf("generate /api/v1/query_range response for series=%d, points=%d", seriesCount, pointsCount)

View file

@ -68,11 +68,11 @@ func StreamQueryRangeResponse(qw422016 *qt422016.Writer, rs []netstorage.Result,
//line app/vmselect/prometheus/query_range_response.qtpl:35
qw422016.N().S(`"seriesFetched": "`)
//line app/vmselect/prometheus/query_range_response.qtpl:36
qw422016.N().DL(qs.SeriesFetched)
qw422016.N().DL(qs.SeriesFetched.Load())
//line app/vmselect/prometheus/query_range_response.qtpl:36
qw422016.N().S(`","executionTimeMsec":`)
//line app/vmselect/prometheus/query_range_response.qtpl:37
qw422016.N().DL(qs.ExecutionTimeMsec)
qw422016.N().DL(qs.ExecutionTimeMsec.Load())
//line app/vmselect/prometheus/query_range_response.qtpl:37
qw422016.N().S(`}`)
//line app/vmselect/prometheus/query_range_response.qtpl:40

View file

@ -35,8 +35,8 @@ See https://prometheus.io/docs/prometheus/latest/querying/api/#instant-queries
// seriesFetched is string instead of int because of historical reasons.
// It cannot be converted to int without breaking backwards compatibility at vmalert :(
%}
"seriesFetched": "{%dl qs.SeriesFetched %}",
"executionTimeMsec": {%dl qs.ExecutionTimeMsec %}
"seriesFetched": "{%dl qs.SeriesFetched.Load() %}",
"executionTimeMsec": {%dl qs.ExecutionTimeMsec.Load() %}
}
{% code
qt.Printf("generate /api/v1/query response for series=%d", seriesCount)

View file

@ -78,11 +78,11 @@ func StreamQueryResponse(qw422016 *qt422016.Writer, rs []netstorage.Result, qt *
//line app/vmselect/prometheus/query_response.qtpl:37
qw422016.N().S(`"seriesFetched": "`)
//line app/vmselect/prometheus/query_response.qtpl:38
qw422016.N().DL(qs.SeriesFetched)
qw422016.N().DL(qs.SeriesFetched.Load())
//line app/vmselect/prometheus/query_response.qtpl:38
qw422016.N().S(`","executionTimeMsec":`)
//line app/vmselect/prometheus/query_response.qtpl:39
qw422016.N().DL(qs.ExecutionTimeMsec)
qw422016.N().DL(qs.ExecutionTimeMsec.Load())
//line app/vmselect/prometheus/query_response.qtpl:39
qw422016.N().S(`}`)
//line app/vmselect/prometheus/query_response.qtpl:42

View file

@ -60,7 +60,7 @@ func (aq *activeQueries) Add(ec *EvalConfig, q string) uint64 {
aqe.start = ec.Start
aqe.end = ec.End
aqe.step = ec.Step
aqe.qid = atomic.AddUint64(&nextActiveQueryID, 1)
aqe.qid = nextActiveQueryID.Add(1)
aqe.quotedRemoteAddr = ec.QuotedRemoteAddr
aqe.q = q
aqe.startTime = time.Now()
@ -87,4 +87,8 @@ func (aq *activeQueries) GetAll() []activeQueryEntry {
return aqes
}
var nextActiveQueryID = uint64(time.Now().UnixNano())
var nextActiveQueryID = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()

View file

@ -171,16 +171,17 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
// QueryStats contains various stats for the query.
type QueryStats struct {
// SeriesFetched contains the number of series fetched from storage during the query evaluation.
SeriesFetched int64
SeriesFetched atomic.Int64
// ExecutionTimeMsec contains the number of milliseconds the query took to execute.
ExecutionTimeMsec int64
ExecutionTimeMsec atomic.Int64
}
func (qs *QueryStats) addSeriesFetched(n int) {
if qs == nil {
return
}
atomic.AddInt64(&qs.SeriesFetched, int64(n))
qs.SeriesFetched.Add(int64(n))
}
func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) {
@ -188,7 +189,7 @@ func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) {
return
}
d := time.Since(startTime).Milliseconds()
atomic.AddInt64(&qs.ExecutionTimeMsec, d)
qs.ExecutionTimeMsec.Add(d)
}
func (ec *EvalConfig) validate() {
@ -949,7 +950,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
return nil, err
}
var samplesScannedTotal uint64
var samplesScannedTotal atomic.Uint64
keepMetricNames := getKeepMetricNames(expr)
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
@ -959,13 +960,13 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return values, timestamps
@ -976,8 +977,8 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal)
rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load())
return tss, nil
}
@ -1793,7 +1794,7 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
preFunc func(values []float64, timestamps []int64), sharedTimestamps []int64) ([]*timeseries, error) {
qt = qt.NewChild("rollup %s() with incremental aggregation %s() over %d series; rollupConfigs=%s", funcName, iafc.ae.Name, rss.Len(), rcs)
defer qt.Done()
var samplesScannedTotal uint64
var samplesScannedTotal atomic.Uint64
err := rss.RunParallel(qt, func(rs *netstorage.Result, workerID uint) error {
rs.Values, rs.Timestamps = dropStaleNaNs(funcName, rs.Values, rs.Timestamps)
preFunc(rs.Values, rs.Timestamps)
@ -1805,12 +1806,12 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
for _, ts := range tsm.m {
iafc.updateTimeseries(ts, workerID)
}
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
continue
}
ts.Reset()
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
iafc.updateTimeseries(ts, workerID)
// ts.Timestamps points to sharedTimestamps. Zero it, so it can be re-used.
@ -1823,8 +1824,8 @@ func evalRollupWithIncrementalAggregate(qt *querytracer.Tracer, funcName string,
return nil, err
}
tss := iafc.finalizeTimeseries()
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal)
rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
qt.Printf("series after aggregation with %s(): %d; samplesScanned=%d", iafc.ae.Name, len(tss), samplesScannedTotal.Load())
return tss, nil
}
@ -1833,7 +1834,7 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
qt = qt.NewChild("rollup %s() over %d series; rollupConfigs=%s", funcName, rss.Len(), rcs)
defer qt.Done()
var samplesScannedTotal uint64
var samplesScannedTotal atomic.Uint64
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
seriesLen := rss.Len()
@ -1843,13 +1844,13 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &rs.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, rs.Values, rs.Timestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &rs.MetricName, rs.Values, rs.Timestamps, sharedTimestamps)
atomic.AddUint64(&samplesScannedTotal, samplesScanned)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return nil
@ -1863,8 +1864,8 @@ func evalRollupNoIncrementalAggregate(qt *querytracer.Tracer, funcName string, k
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal))
qt.Printf("samplesScanned=%d", samplesScannedTotal)
rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
qt.Printf("samplesScanned=%d", samplesScannedTotal.Load())
return tss, nil
}

View file

@ -86,14 +86,14 @@ func TestQueryStats_addSeriesFetched(t *testing.T) {
}
ec.QueryStats.addSeriesFetched(1)
if qs.SeriesFetched != 1 {
t.Fatalf("expected to get 1; got %d instead", qs.SeriesFetched)
if n := qs.SeriesFetched.Load(); n != 1 {
t.Fatalf("expected to get 1; got %d instead", n)
}
ecNew := copyEvalConfig(ec)
ecNew.QueryStats.addSeriesFetched(3)
if qs.SeriesFetched != 4 {
t.Fatalf("expected to get 4; got %d instead", qs.SeriesFetched)
if n := qs.SeriesFetched.Load(); n != 4 {
t.Fatalf("expected to get 4; got %d instead", n)
}
}

View file

@ -352,22 +352,19 @@ type parseCacheValue struct {
}
type parseCache struct {
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
requests atomic.Uint64
misses atomic.Uint64
m map[string]*parseCacheValue
mu sync.RWMutex
}
func (pc *parseCache) Requests() uint64 {
return atomic.LoadUint64(&pc.requests)
return pc.requests.Load()
}
func (pc *parseCache) Misses() uint64 {
return atomic.LoadUint64(&pc.misses)
return pc.misses.Load()
}
func (pc *parseCache) Len() uint64 {
@ -378,14 +375,14 @@ func (pc *parseCache) Len() uint64 {
}
func (pc *parseCache) Get(q string) *parseCacheValue {
atomic.AddUint64(&pc.requests, 1)
pc.requests.Add(1)
pc.mu.RLock()
pcv := pc.m[q]
pc.mu.RUnlock()
if pcv == nil {
atomic.AddUint64(&pc.misses, 1)
pc.misses.Add(1)
}
return pcv
}

View file

@ -45,7 +45,7 @@ func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) {
rollupResultResetMetricRowSample.Store(&storage.MetricRow{})
go checkRollupResultCacheReset()
})
if atomic.LoadUint32(&needRollupResultCacheReset) != 0 {
if needRollupResultCacheReset.Load() {
// The cache has been already instructed to reset.
return
}
@ -63,14 +63,14 @@ func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) {
}
if needCacheReset {
// Do not call ResetRollupResultCache() here, since it may be heavy when frequently called.
atomic.StoreUint32(&needRollupResultCacheReset, 1)
needRollupResultCacheReset.Store(true)
}
}
func checkRollupResultCacheReset() {
for {
time.Sleep(checkRollupResultCacheResetInterval)
if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 {
if needRollupResultCacheReset.Swap(false) {
mr := rollupResultResetMetricRowSample.Load()
d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds()
logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs",
@ -82,7 +82,7 @@ func checkRollupResultCacheReset() {
const checkRollupResultCacheResetInterval = 5 * time.Second
var needRollupResultCacheReset uint32
var needRollupResultCacheReset atomic.Bool
var checkRollupResultCacheResetOnce sync.Once
var rollupResultResetMetricRowSample atomic.Pointer[storage.MetricRow]
@ -129,7 +129,7 @@ func InitRollupResultCache(cachePath string) {
mustLoadRollupResultCacheKeyPrefix(rollupResultCachePath)
} else {
c = workingsetcache.New(cacheSize)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix())
}
if *disableCache {
c.Reset()
@ -211,7 +211,7 @@ var rollupResultCacheResets = metrics.NewCounter(`vm_cache_resets_total{type="pr
// ResetRollupResultCache resets rollup result cache.
func ResetRollupResultCache() {
rollupResultCacheResets.Inc()
atomic.AddUint64(&rollupResultCacheKeyPrefix, 1)
rollupResultCacheKeyPrefix.Add(1)
logger.Infof("rollupResult cache has been cleared")
}
@ -438,8 +438,8 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig,
}
var key rollupResultCacheKey
key.prefix = rollupResultCacheKeyPrefix
key.suffix = atomic.AddUint64(&rollupResultCacheKeySuffix, 1)
key.prefix = rollupResultCacheKeyPrefix.Load()
key.suffix = rollupResultCacheKeySuffix.Add(1)
bb := bbPool.Get()
bb.B = key.Marshal(bb.B[:0])
@ -455,8 +455,12 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig,
}
var (
rollupResultCacheKeyPrefix uint64
rollupResultCacheKeySuffix = uint64(time.Now().UnixNano())
rollupResultCacheKeyPrefix atomic.Uint64
rollupResultCacheKeySuffix = func() *atomic.Uint64 {
var x atomic.Uint64
x.Store(uint64(time.Now().UnixNano()))
return &x
}()
)
func (rrc *rollupResultCache) getSeriesFromCache(qt *querytracer.Tracer, key []byte) ([]*timeseries, bool) {
@ -517,26 +521,26 @@ func newRollupResultCacheKeyPrefix() uint64 {
func mustLoadRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
if !fs.IsPathExist(path) {
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix())
return
}
data, err := os.ReadFile(path)
if err != nil {
logger.Errorf("cannot load %s: %s; reset rollupResult cache", path, err)
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix())
return
}
if len(data) != 8 {
logger.Errorf("unexpected size of %s; want 8 bytes; got %d bytes; reset rollupResult cache", path, len(data))
rollupResultCacheKeyPrefix = newRollupResultCacheKeyPrefix()
rollupResultCacheKeyPrefix.Store(newRollupResultCacheKeyPrefix())
return
}
rollupResultCacheKeyPrefix = encoding.UnmarshalUint64(data)
rollupResultCacheKeyPrefix.Store(encoding.UnmarshalUint64(data))
}
func mustSaveRollupResultCacheKeyPrefix(path string) {
path = path + ".key.prefix"
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix)
data := encoding.MarshalUint64(nil, rollupResultCacheKeyPrefix.Load())
fs.MustWriteAtomic(path, data, true)
}
@ -552,7 +556,7 @@ const (
func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load())
dst = append(dst, rollupResultCacheTypeSeries)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)
@ -563,7 +567,7 @@ func marshalRollupResultCacheKeyForSeries(dst []byte, expr metricsql.Expr, windo
func marshalRollupResultCacheKeyForInstantValues(dst []byte, expr metricsql.Expr, window, step int64, etfs [][]storage.TagFilter) []byte {
dst = append(dst, rollupResultCacheVersion)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix)
dst = encoding.MarshalUint64(dst, rollupResultCacheKeyPrefix.Load())
dst = append(dst, rollupResultCacheTypeInstantValues)
dst = encoding.MarshalInt64(dst, window)
dst = encoding.MarshalInt64(dst, step)