lib/storage: consistently use atomic.* types instead of atomic.* function calls on ordinary types

See ea9e2b19a5
This commit is contained in:
Aliaksandr Valialkin 2024-02-24 00:15:21 +02:00
parent 4617dc8bbe
commit b3d9d36fb3
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
13 changed files with 166 additions and 164 deletions

View file

@ -435,12 +435,12 @@ var (
promscrapeConfigReloadRequests = metrics.NewCounter(`vm_http_requests_total{path="/-/reload"}`)
_ = metrics.NewGauge(`vm_metrics_with_dropped_labels_total`, func() float64 {
return float64(atomic.LoadUint64(&storage.MetricsWithDroppedLabels))
return float64(storage.MetricsWithDroppedLabels.Load())
})
_ = metrics.NewGauge(`vm_too_long_label_names_total`, func() float64 {
return float64(atomic.LoadUint64(&storage.TooLongLabelNames))
return float64(storage.TooLongLabelNames.Load())
})
_ = metrics.NewGauge(`vm_too_long_label_values_total`, func() float64 {
return float64(atomic.LoadUint64(&storage.TooLongLabelValues))
return float64(storage.TooLongLabelValues.Load())
})
)

View file

@ -131,8 +131,8 @@ func (bsw *blockStreamWriter) MustClose() {
}
// WriteExternalBlock writes b to bsw and updates ph and rowsMerged.
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) {
atomic.AddUint64(rowsMerged, uint64(b.rowsCount()))
func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *atomic.Uint64) {
rowsMerged.Add(uint64(b.rowsCount()))
b.deduplicateSamplesDuringMerge()
headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset)
@ -141,8 +141,8 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
// The current timestamps block equals to the previous timestamps block.
// Update headerData so it points to the previous timestamps block. This saves disk space.
headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset)
atomic.AddUint64(&timestampsBlocksMerged, 1)
atomic.AddUint64(&timestampsBytesSaved, uint64(len(timestampsData)))
timestampsBlocksMerged.Add(1)
timestampsBytesSaved.Add(uint64(len(timestampsData)))
}
if len(bsw.indexData)+len(headerData) > maxBlockSize {
@ -163,8 +163,8 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM
}
var (
timestampsBlocksMerged uint64
timestampsBytesSaved uint64
timestampsBlocksMerged atomic.Uint64
timestampsBytesSaved atomic.Uint64
)
func updatePartHeader(b *Block, ph *partHeader) {

View file

@ -2,6 +2,7 @@ package storage
import (
"fmt"
"sync/atomic"
"testing"
)
@ -22,7 +23,7 @@ func BenchmarkBlockStreamWriterRowsBestCase(b *testing.B) {
}
func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeRows bool) {
var rowsMerged uint64
var rowsMerged atomic.Uint64
b.ReportAllocs()
b.SetBytes(int64(rowsCount))

View file

@ -75,21 +75,21 @@ type indexDB struct {
// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
missingTSIDsForMetricID atomic.Uint64
// The number of calls for date range searches.
dateRangeSearchCalls uint64
dateRangeSearchCalls atomic.Uint64
// The number of hits for date range searches.
dateRangeSearchHits uint64
dateRangeSearchHits atomic.Uint64
// The number of calls for global search.
globalSearchCalls uint64
globalSearchCalls atomic.Uint64
// missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries.
// High rate may mean corrupted indexDB due to unclean shutdown.
// The db must be automatically recovered after that.
missingMetricNamesForMetricID uint64
missingMetricNamesForMetricID atomic.Uint64
// generation identifies the index generation ID
// and is used for syncing items from different indexDBs
@ -216,20 +216,20 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
m.IndexDBRefCount += uint64(db.refCount.Load())
m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)
m.MissingTSIDsForMetricID += db.missingTSIDsForMetricID.Load()
m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
m.GlobalSearchCalls += atomic.LoadUint64(&db.globalSearchCalls)
m.DateRangeSearchCalls += db.dateRangeSearchCalls.Load()
m.DateRangeSearchHits += db.dateRangeSearchHits.Load()
m.GlobalSearchCalls += db.globalSearchCalls.Load()
m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID)
m.MissingMetricNamesForMetricID += db.missingMetricNamesForMetricID.Load()
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
m.IndexBlocksWithMetricIDsProcessed = indexBlocksWithMetricIDsProcessed.Load()
m.IndexBlocksWithMetricIDsIncorrectOrder = indexBlocksWithMetricIDsIncorrectOrder.Load()
m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex)
m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions)
m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions)
m.CompositeFilterSuccessConversions = compositeFilterSuccessConversions.Load()
m.CompositeFilterMissingConversions = compositeFilterMissingConversions.Load()
db.tb.UpdateMetrics(&m.TableMetrics)
db.doExtDB(func(extDB *indexDB) {
@ -374,7 +374,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
// isn't persisted to disk (it is very volatile because of tagFiltersKeyGen).
prefix := ^uint64(0)
if versioned {
prefix = atomic.LoadUint64(&tagFiltersKeyGen)
prefix = tagFiltersKeyGen.Load()
}
// Round start and end times to per-day granularity according to per-day inverted index.
startDate := uint64(tr.MinTimestamp) / msecPerDay
@ -393,10 +393,10 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione
func invalidateTagFiltersCache() {
// This function must be fast, since it is called each time new timeseries is added.
atomic.AddUint64(&tagFiltersKeyGen, 1)
tagFiltersKeyGen.Add(1)
}
var tagFiltersKeyGen uint64
var tagFiltersKeyGen atomic.Uint64
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
// Compress metricIDs, so they occupy less space in the cache.
@ -1474,7 +1474,7 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt
// Cannot find MetricName for the given metricID. This may be the case
// when indexDB contains incomplete set of metricID -> metricName entries
// after a snapshot or due to unflushed entries.
atomic.AddUint64(&db.missingMetricNamesForMetricID, 1)
db.missingMetricNamesForMetricID.Add(1)
// Mark the metricID as deleted, so it will be created again when new data point
// for the given time series will arrive.
@ -1754,7 +1754,7 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it for now.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
is.db.missingTSIDsForMetricID.Add(1)
continue
}
is.db.putToMetricIDCache(metricID, tsid)
@ -2205,7 +2205,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(qt *querytracer.Tracer, metr
// Slow path - fall back to search in the global inverted index.
qt.Printf("cannot find metric ids in per-day index; fall back to global index")
atomic.AddUint64(&is.db.globalSearchCalls, 1)
is.db.globalSearchCalls.Add(1)
m, err := is.getMetricIDsForDateAndFilters(qt, 0, tfs, maxMetrics)
if err != nil {
if errors.Is(err, errFallbackToGlobalSearch) {
@ -2395,7 +2395,7 @@ var errFallbackToGlobalSearch = errors.New("fall back from per-day index search
const maxDaysForPerDaySearch = 40
func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
is.db.dateRangeSearchCalls.Add(1)
minDate := uint64(tr.MinTimestamp) / msecPerDay
maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
@ -2409,7 +2409,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer,
return err
}
metricIDs.UnionMayOwn(m)
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
is.db.dateRangeSearchHits.Add(1)
return nil
}
@ -2451,7 +2451,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer,
if errGlobal != nil {
return errGlobal
}
atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
is.db.dateRangeSearchHits.Add(1)
return nil
}
@ -2956,13 +2956,17 @@ func generateUniqueMetricID() uint64 {
// It is expected that metricIDs returned from this function must be dense.
// If they will be sparse, then this may hurt metric_ids intersection
// performance with uint64set.Set.
return atomic.AddUint64(&nextUniqueMetricID, 1)
return nextUniqueMetricID.Add(1)
}
// This number mustn't go backwards on restarts, otherwise metricID
// collisions are possible. So don't change time on the server
// between VictoriaMetrics restarts.
var nextUniqueMetricID = uint64(time.Now().UnixNano())
var nextUniqueMetricID = func() *atomic.Uint64 {
var n atomic.Uint64
n.Store(uint64(time.Now().UnixNano()))
return &n
}()
func marshalCommonPrefix(dst []byte, nsPrefix byte) []byte {
dst = append(dst, nsPrefix)
@ -3226,7 +3230,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi
// Leave the original items unmerged, so they can be merged next time.
// This case should be quite rare - if multiple data points are simultaneously inserted
// into the same new time series from multiple concurrent goroutines.
atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1)
indexBlocksWithMetricIDsIncorrectOrder.Add(1)
dstData = append(dstData[:0], tmm.dataCopy...)
dstItems = append(dstItems[:0], tmm.itemsCopy...)
if !checkItemsSorted(dstData, dstItems) {
@ -3234,12 +3238,12 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi
}
}
putTagToMetricIDsRowsMerger(tmm)
atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1)
indexBlocksWithMetricIDsProcessed.Add(1)
return dstData, dstItems
}
var indexBlocksWithMetricIDsIncorrectOrder uint64
var indexBlocksWithMetricIDsProcessed uint64
var indexBlocksWithMetricIDsIncorrectOrder atomic.Uint64
var indexBlocksWithMetricIDsProcessed atomic.Uint64
func checkItemsSorted(data []byte, items []mergeset.Item) bool {
if len(items) == 0 {

View file

@ -1480,7 +1480,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) {
}
// check new series were registered in indexDB
added := atomic.LoadUint64(&db.s.newTimeseriesCreated)
added := db.s.newTimeseriesCreated.Load()
if added != metricRowsN {
t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added)
}

View file

@ -15,7 +15,7 @@ import (
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64,
rowsMerged, rowsDeleted *uint64) error {
rowsMerged, rowsDeleted *atomic.Uint64) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
@ -38,7 +38,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *uint64) error {
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *atomic.Uint64) error {
dmis := s.getDeletedMetricIDs()
pendingBlockIsEmpty := true
pendingBlock := getBlock()
@ -54,13 +54,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
b := bsm.Block
if dmis.Has(b.bh.TSID.MetricID) {
// Skip blocks for deleted metrics.
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
rowsDeleted.Add(uint64(b.bh.RowsCount))
continue
}
retentionDeadline := bsm.getRetentionDeadline(&b.bh)
if b.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount))
rowsDeleted.Add(uint64(b.bh.RowsCount))
continue
}
if pendingBlockIsEmpty {
@ -132,7 +132,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
}
// mergeBlocks merges ib1 and ib2 to ob.
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) {
func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) {
ib1.assertMergeable(ib2)
ib1.assertUnmarshaled()
ib2.assertUnmarshaled()
@ -177,7 +177,7 @@ func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint
}
}
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) {
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) {
if b.bh.MinTimestamp >= retentionDeadline {
// Fast path - the block contains only samples with timestamps bigger than retentionDeadline.
return
@ -189,7 +189,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted
nextIdx++
}
if n := nextIdx - nextIdxOrig; n > 0 {
atomic.AddUint64(rowsDeleted, uint64(n))
rowsDeleted.Add(uint64(n))
b.nextIdx = nextIdx
}
}

View file

@ -3,6 +3,7 @@ package storage
import (
"errors"
"math/rand"
"sync/atomic"
"testing"
)
@ -371,18 +372,18 @@ func TestMergeForciblyStop(t *testing.T) {
var bsw blockStreamWriter
bsw.MustInitFromInmemoryPart(&mp, -5)
ch := make(chan struct{})
var rowsMerged, rowsDeleted uint64
var rowsMerged, rowsDeleted atomic.Uint64
close(ch)
strg := newTestStorage()
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
}
if rowsMerged != 0 {
t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, 0)
if n := rowsMerged.Load(); n != 0 {
t.Fatalf("unexpected rowsMerged; got %d; want %d", n, 0)
}
if rowsDeleted != 0 {
t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0)
if n := rowsDeleted.Load(); n != 0 {
t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0)
}
stopTestStorage(strg)
}
@ -396,7 +397,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
bsw.MustInitFromInmemoryPart(&mp, -5)
strg := newTestStorage()
var rowsMerged, rowsDeleted uint64
var rowsMerged, rowsDeleted atomic.Uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
}
@ -406,11 +407,11 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
if mp.ph.RowsCount != uint64(expectedRowsCount) {
t.Fatalf("unexpected rows count in partHeader; got %d; want %d", mp.ph.RowsCount, expectedRowsCount)
}
if rowsMerged != mp.ph.RowsCount {
t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, mp.ph.RowsCount)
if n := rowsMerged.Load(); n != mp.ph.RowsCount {
t.Fatalf("unexpected rowsMerged; got %d; want %d", n, mp.ph.RowsCount)
}
if rowsDeleted != 0 {
t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0)
if n := rowsDeleted.Load(); n != 0 {
t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0)
}
if mp.ph.MinTimestamp != expectedMinTimestamp {
t.Fatalf("unexpected MinTimestamp in partHeader; got %d; want %d", mp.ph.MinTimestamp, expectedMinTimestamp)

View file

@ -3,6 +3,7 @@ package storage
import (
"fmt"
"math/rand"
"sync/atomic"
"testing"
)
@ -23,7 +24,7 @@ func BenchmarkMergeBlockStreamsFourSourcesBestCase(b *testing.B) {
}
func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop int64) {
var rowsMerged, rowsDeleted uint64
var rowsMerged, rowsDeleted atomic.Uint64
strg := newTestStorage()
b.ReportAllocs()

View file

@ -516,7 +516,7 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte {
}
label := &labels[i]
if len(label.Name) > maxLabelNameLen {
atomic.AddUint64(&TooLongLabelNames, 1)
TooLongLabelNames.Add(1)
label.Name = label.Name[:maxLabelNameLen]
}
if len(label.Value) > maxLabelValueLen {
@ -554,17 +554,17 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte {
var (
// MetricsWithDroppedLabels is the number of metrics with at least a single dropped label
MetricsWithDroppedLabels uint64
MetricsWithDroppedLabels atomic.Uint64
// TooLongLabelNames is the number of too long label names
TooLongLabelNames uint64
TooLongLabelNames atomic.Uint64
// TooLongLabelValues is the number of too long label values
TooLongLabelValues uint64
TooLongLabelValues atomic.Uint64
)
func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
atomic.AddUint64(&MetricsWithDroppedLabels, 1)
MetricsWithDroppedLabels.Add(1)
select {
case <-droppedLabelsLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) {
}
func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) {
atomic.AddUint64(&TooLongLabelValues, 1)
TooLongLabelValues.Add(1)
select {
case <-truncatedLabelsLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage

View file

@ -75,26 +75,23 @@ const maxRawRowsPerShard = (8 << 20) / int(unsafe.Sizeof(rawRow{}))
// partition represents a partition.
type partition struct {
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
activeInmemoryMerges atomic.Int64
activeSmallMerges atomic.Int64
activeBigMerges atomic.Int64
activeInmemoryMerges uint64
activeSmallMerges uint64
activeBigMerges uint64
inmemoryMergesCount atomic.Uint64
smallMergesCount atomic.Uint64
bigMergesCount atomic.Uint64
inmemoryMergesCount uint64
smallMergesCount uint64
bigMergesCount uint64
inmemoryRowsMerged atomic.Uint64
smallRowsMerged atomic.Uint64
bigRowsMerged atomic.Uint64
inmemoryRowsMerged uint64
smallRowsMerged uint64
bigRowsMerged uint64
inmemoryRowsDeleted atomic.Uint64
smallRowsDeleted atomic.Uint64
bigRowsDeleted atomic.Uint64
inmemoryRowsDeleted uint64
smallRowsDeleted uint64
bigRowsDeleted uint64
mergeIdx uint64
mergeIdx atomic.Uint64
// the path to directory with smallParts.
smallPartsPath string
@ -274,12 +271,12 @@ func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partiti
func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition {
p := &partition{
name: name,
mergeIdx: uint64(time.Now().UnixNano()),
smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath,
s: s,
stopCh: make(chan struct{}),
}
p.mergeIdx.Store(uint64(time.Now().UnixNano()))
p.rawRows.init()
return p
}
@ -376,21 +373,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.IndexBlocksCacheRequests = ibCache.Requests()
m.IndexBlocksCacheMisses = ibCache.Misses()
m.ActiveInmemoryMerges += atomic.LoadUint64(&pt.activeInmemoryMerges)
m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges)
m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges)
m.ActiveInmemoryMerges += uint64(pt.activeInmemoryMerges.Load())
m.ActiveSmallMerges += uint64(pt.activeSmallMerges.Load())
m.ActiveBigMerges += uint64(pt.activeBigMerges.Load())
m.InmemoryMergesCount += atomic.LoadUint64(&pt.inmemoryMergesCount)
m.SmallMergesCount += atomic.LoadUint64(&pt.smallMergesCount)
m.BigMergesCount += atomic.LoadUint64(&pt.bigMergesCount)
m.InmemoryMergesCount += pt.inmemoryMergesCount.Load()
m.SmallMergesCount += pt.smallMergesCount.Load()
m.BigMergesCount += pt.bigMergesCount.Load()
m.InmemoryRowsMerged += atomic.LoadUint64(&pt.inmemoryRowsMerged)
m.SmallRowsMerged += atomic.LoadUint64(&pt.smallRowsMerged)
m.BigRowsMerged += atomic.LoadUint64(&pt.bigRowsMerged)
m.InmemoryRowsMerged += pt.inmemoryRowsMerged.Load()
m.SmallRowsMerged += pt.smallRowsMerged.Load()
m.BigRowsMerged += pt.bigRowsMerged.Load()
m.InmemoryRowsDeleted += atomic.LoadUint64(&pt.inmemoryRowsDeleted)
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted)
m.InmemoryRowsDeleted += pt.inmemoryRowsDeleted.Load()
m.SmallRowsDeleted += pt.smallRowsDeleted.Load()
m.BigRowsDeleted += pt.bigRowsDeleted.Load()
}
// AddRows adds the given rows to the partition pt.
@ -1531,10 +1528,10 @@ func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader {
func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) {
var ph partHeader
var rowsMerged *uint64
var rowsDeleted *uint64
var mergesCount *uint64
var activeMerges *uint64
var rowsMerged *atomic.Uint64
var rowsDeleted *atomic.Uint64
var mergesCount *atomic.Uint64
var activeMerges *atomic.Int64
switch dstPartType {
case partInmemory:
rowsMerged = &pt.inmemoryRowsMerged
@ -1555,10 +1552,10 @@ func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWrit
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
retentionDeadline := timestampFromTime(time.Now()) - pt.s.retentionMsecs
atomic.AddUint64(activeMerges, 1)
activeMerges.Add(1)
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted)
atomic.AddUint64(activeMerges, ^uint64(0))
atomic.AddUint64(mergesCount, 1)
activeMerges.Add(-1)
mergesCount.Add(1)
if err != nil {
return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err)
}
@ -1674,7 +1671,7 @@ func getCompressLevel(rowsPerBlock float64) int {
}
func (pt *partition) nextMergeIdx() uint64 {
return atomic.AddUint64(&pt.mergeIdx, 1)
return pt.mergeIdx.Add(1)
}
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {
@ -1712,21 +1709,21 @@ func (pt *partition) removeStaleParts() {
pt.partsLock.Lock()
for _, pw := range pt.inmemoryParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.inmemoryRowsDeleted, pw.p.ph.RowsCount)
pt.inmemoryRowsDeleted.Add(pw.p.ph.RowsCount)
pw.isInMerge = true
pws = append(pws, pw)
}
}
for _, pw := range pt.smallParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount)
pt.smallRowsDeleted.Add(pw.p.ph.RowsCount)
pw.isInMerge = true
pws = append(pws, pw)
}
}
for _, pw := range pt.bigParts {
if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline {
atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount)
pt.bigRowsDeleted.Add(pw.p.ph.RowsCount)
pw.isInMerge = true
pws = append(pws, pw)
}

View file

@ -3,6 +3,7 @@ package storage
import (
"sort"
"sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -102,7 +103,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
// Group rows into blocks.
var scale int16
var rowsMerged uint64
var rowsMerged atomic.Uint64
r := &rows[0]
tsid := &r.TSID
precisionBits := r.PrecisionBits
@ -129,8 +130,8 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR
rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues)
tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits)
rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged)
if rowsMerged != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows))
if n := rowsMerged.Load(); n != uint64(len(rows)) {
logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", n, len(rows))
}
rrm.bsw.MustClose()
}

View file

@ -40,27 +40,25 @@ const (
// Storage represents TSDB storage.
type Storage struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
tooSmallTimestampRows atomic.Uint64
tooBigTimestampRows atomic.Uint64
timeseriesRepopulated uint64
timeseriesPreCreated uint64
newTimeseriesCreated uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
timeseriesRepopulated atomic.Uint64
timeseriesPreCreated atomic.Uint64
newTimeseriesCreated atomic.Uint64
slowRowInserts atomic.Uint64
slowPerDayIndexInserts atomic.Uint64
slowMetricNameLoads atomic.Uint64
hourlySeriesLimitRowsDropped uint64
dailySeriesLimitRowsDropped uint64
hourlySeriesLimitRowsDropped atomic.Uint64
dailySeriesLimitRowsDropped atomic.Uint64
// nextRotationTimestamp is a timestamp in seconds of the next indexdb rotation.
//
// It is used for gradual pre-population of the idbNext during the last hour before the indexdb rotation.
// in order to reduce spikes in CPU and disk IO usage just after the rotiation.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
nextRotationTimestamp int64
nextRotationTimestamp atomic.Int64
path string
cachePath string
@ -124,7 +122,7 @@ type Storage struct {
prefetchedMetricIDs *uint64set.Set
// prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series.
prefetchedMetricIDsDeadline uint64
prefetchedMetricIDsDeadline atomic.Uint64
stop chan struct{}
@ -245,7 +243,7 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD
nowSecs := int64(fasttime.UnixTimestamp())
retentionSecs := retention.Milliseconds() / 1000 // not .Seconds() because unnecessary float64 conversion
nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionSecs, retentionTimezoneOffsetSecs)
atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp)
s.nextRotationTimestamp.Store(nextRotationTimestamp)
// Load nextDayMetricIDs cache
date := fasttime.UnixDate()
@ -545,34 +543,34 @@ func (m *Metrics) Reset() {
// UpdateMetrics updates m with metrics from s.
func (s *Storage) UpdateMetrics(m *Metrics) {
m.RowsAddedTotal = atomic.LoadUint64(&rowsAddedTotal)
m.RowsAddedTotal = rowsAddedTotal.Load()
m.DedupsDuringMerge = dedupsDuringMerge.Load()
m.SnapshotsCount += uint64(s.mustGetSnapshotsCount())
m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows)
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load()
m.TooBigTimestampRows += s.tooBigTimestampRows.Load()
m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated)
m.TimeseriesPreCreated += atomic.LoadUint64(&s.timeseriesPreCreated)
m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads)
m.TimeseriesRepopulated += s.timeseriesRepopulated.Load()
m.TimeseriesPreCreated += s.timeseriesPreCreated.Load()
m.NewTimeseriesCreated += s.newTimeseriesCreated.Load()
m.SlowRowInserts += s.slowRowInserts.Load()
m.SlowPerDayIndexInserts += s.slowPerDayIndexInserts.Load()
m.SlowMetricNameLoads += s.slowMetricNameLoads.Load()
if sl := s.hourlySeriesLimiter; sl != nil {
m.HourlySeriesLimitRowsDropped += atomic.LoadUint64(&s.hourlySeriesLimitRowsDropped)
m.HourlySeriesLimitRowsDropped += s.hourlySeriesLimitRowsDropped.Load()
m.HourlySeriesLimitMaxSeries += uint64(sl.MaxItems())
m.HourlySeriesLimitCurrentSeries += uint64(sl.CurrentItems())
}
if sl := s.dailySeriesLimiter; sl != nil {
m.DailySeriesLimitRowsDropped += atomic.LoadUint64(&s.dailySeriesLimitRowsDropped)
m.DailySeriesLimitRowsDropped += s.dailySeriesLimitRowsDropped.Load()
m.DailySeriesLimitMaxSeries += uint64(sl.MaxItems())
m.DailySeriesLimitCurrentSeries += uint64(sl.CurrentItems())
}
m.TimestampsBlocksMerged = atomic.LoadUint64(&timestampsBlocksMerged)
m.TimestampsBytesSaved = atomic.LoadUint64(&timestampsBytesSaved)
m.TimestampsBlocksMerged = timestampsBlocksMerged.Load()
m.TimestampsBytesSaved = timestampsBytesSaved.Load()
var cs fastcache.Stats
s.tsidCache.UpdateStats(&cs)
@ -603,8 +601,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount())
m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes())
m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount)
m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount)
m.DateMetricIDCacheSyncsCount += s.dateMetricIDCache.syncsCount.Load()
m.DateMetricIDCacheResetsCount += s.dateMetricIDCache.resetsCount.Load()
hmCurr := s.currHourMetricIDs.Load()
hmPrev := s.prevHourMetricIDs.Load()
@ -637,7 +635,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
}
func (s *Storage) nextRetentionSeconds() int64 {
return atomic.LoadInt64(&s.nextRotationTimestamp) - int64(fasttime.UnixTimestamp())
return s.nextRotationTimestamp.Load() - int64(fasttime.UnixTimestamp())
}
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
@ -783,7 +781,7 @@ func (s *Storage) mustRotateIndexDB(currentTime time.Time) {
// Update nextRotationTimestamp
nextRotationTimestamp := currentTime.Unix() + s.retentionMsecs/1000
atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp)
s.nextRotationTimestamp.Store(nextRotationTimestamp)
// Set idbNext to idbNew
idbNext := s.idbNext.Load()
@ -1191,7 +1189,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
qt.Printf("skip pre-fetching metric names for low number of missing metric ids=%d", len(metricIDs))
return nil
}
atomic.AddUint64(&s.slowMetricNameLoads, uint64(len(metricIDs)))
s.slowMetricNameLoads.Add(uint64(len(metricIDs)))
// Pre-fetch metricIDs.
var missingMetricIDs []uint64
@ -1232,12 +1230,12 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin
// Store the pre-fetched metricIDs, so they aren't pre-fetched next time.
s.prefetchedMetricIDsLock.Lock()
if fasttime.UnixTimestamp() > atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) {
if fasttime.UnixTimestamp() > s.prefetchedMetricIDsDeadline.Load() {
// Periodically reset the prefetchedMetricIDs in order to limit its size.
s.prefetchedMetricIDs = &uint64set.Set{}
d := timeutil.AddJitterToDuration(time.Second * 20 * 60)
metricIDsDeadline := fasttime.UnixTimestamp() + uint64(d.Seconds())
atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, metricIDsDeadline)
s.prefetchedMetricIDsDeadline.Store(metricIDsDeadline)
}
s.prefetchedMetricIDs.AddMulti(metricIDs)
s.prefetchedMetricIDsLock.Unlock()
@ -1547,7 +1545,7 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
return s.tb.ForceMergePartitions(partitionNamePrefix)
}
var rowsAddedTotal uint64
var rowsAddedTotal atomic.Uint64
// AddRows adds the given mrs to s.
//
@ -1576,7 +1574,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
}
continue
}
atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))
rowsAddedTotal.Add(uint64(len(mrsBlock)))
}
putMetricRowsInsertCtx(ic)
@ -1709,7 +1707,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
}
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
s.timeseriesRepopulated.Add(seriesRepopulated)
// There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called.
// So it is OK to register metric names in blocking manner after indexdb rotation.
@ -1766,7 +1764,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
"probably you need updating -retentionPeriod command-line flag; metricName: %s",
mr.Timestamp, minTimestamp, metricName)
}
atomic.AddUint64(&s.tooSmallTimestampRows, 1)
s.tooSmallTimestampRows.Add(1)
continue
}
if mr.Timestamp > maxTimestamp {
@ -1776,7 +1774,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; metricName: %s",
mr.Timestamp, maxTimestamp, metricName)
}
atomic.AddUint64(&s.tooBigTimestampRows, 1)
s.tooBigTimestampRows.Add(1)
continue
}
dstMrs[j] = mr
@ -1900,9 +1898,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
}
atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)
atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount)
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
s.slowRowInserts.Add(slowInsertsCount)
s.newTimeseriesCreated.Add(newSeriesCount)
s.timeseriesRepopulated.Add(seriesRepopulated)
dstMrs = dstMrs[:j]
rows = rows[:j]
@ -1956,12 +1954,12 @@ func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool {
if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1)
s.hourlySeriesLimitRowsDropped.Add(1)
logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems())
return false
}
if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) {
atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1)
s.dailySeriesLimitRowsDropped.Add(1)
logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems())
return false
}
@ -2056,7 +2054,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
s.putSeriesToCache(metricNameRaw, &genTSID, date)
timeseriesPreCreated++
}
atomic.AddUint64(&s.timeseriesPreCreated, timeseriesPreCreated)
s.timeseriesPreCreated.Add(timeseriesPreCreated)
return firstError
}
@ -2163,7 +2161,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
// Slow path - add new (date, metricID) entries to indexDB.
atomic.AddUint64(&s.slowPerDayIndexInserts, uint64(len(pendingDateMetricIDs)))
s.slowPerDayIndexInserts.Add(uint64(len(pendingDateMetricIDs)))
// Sort pendingDateMetricIDs by (date, metricID) in order to speed up `is` search in the loop below.
sort.Slice(pendingDateMetricIDs, func(i, j int) bool {
a := pendingDateMetricIDs[i]
@ -2218,9 +2216,8 @@ func fastHashUint64(x uint64) uint64 {
//
// It should be faster than map[date]*uint64set.Set on multicore systems.
type dateMetricIDCache struct {
// 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches.
syncsCount uint64
resetsCount uint64
syncsCount atomic.Uint64
resetsCount atomic.Uint64
// Contains immutable map
byDate atomic.Pointer[byDateMetricIDMap]
@ -2248,7 +2245,7 @@ func (dmc *dateMetricIDCache) resetLocked() {
dmc.byDateMutable = newByDateMetricIDMap()
dmc.slowHits = 0
atomic.AddUint64(&dmc.resetsCount, 1)
dmc.resetsCount.Add(1)
}
func (dmc *dateMetricIDCache) EntriesCount() int {
@ -2395,7 +2392,7 @@ func (dmc *dateMetricIDCache) syncLocked() {
dmc.byDate.Store(dmc.byDateMutable)
dmc.byDateMutable = newByDateMetricIDMap()
atomic.AddUint64(&dmc.syncsCount, 1)
dmc.syncsCount.Add(1)
if dmc.SizeBytes() > uint64(memory.Allowed())/256 {
dmc.resetLocked()

View file

@ -58,7 +58,7 @@ func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters {
// then it is impossible to construct composite tag filter.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2238
if len(names) == 0 || !hasPositiveFilter {
atomic.AddUint64(&compositeFilterMissingConversions, 1)
compositeFilterMissingConversions.Add(1)
return []*TagFilters{tfs}
}
@ -117,20 +117,20 @@ func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters {
if compositeFilters == 0 {
// Cannot use tfsNew, since it doesn't contain composite filters, e.g. it may match broader set of series.
// Fall back to the original tfs.
atomic.AddUint64(&compositeFilterMissingConversions, 1)
compositeFilterMissingConversions.Add(1)
return []*TagFilters{tfs}
}
tfsCompiled := NewTagFilters()
tfsCompiled.tfs = tfsNew
tfssCompiled = append(tfssCompiled, tfsCompiled)
}
atomic.AddUint64(&compositeFilterSuccessConversions, 1)
compositeFilterSuccessConversions.Add(1)
return tfssCompiled
}
var (
compositeFilterSuccessConversions uint64
compositeFilterMissingConversions uint64
compositeFilterSuccessConversions atomic.Uint64
compositeFilterMissingConversions atomic.Uint64
)
// TagFilters represents filters used for filtering tags.