diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 0fc04cafb..78fee4ead 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -435,12 +435,12 @@ var ( promscrapeConfigReloadRequests = metrics.NewCounter(`vm_http_requests_total{path="/-/reload"}`) _ = metrics.NewGauge(`vm_metrics_with_dropped_labels_total`, func() float64 { - return float64(atomic.LoadUint64(&storage.MetricsWithDroppedLabels)) + return float64(storage.MetricsWithDroppedLabels.Load()) }) _ = metrics.NewGauge(`vm_too_long_label_names_total`, func() float64 { - return float64(atomic.LoadUint64(&storage.TooLongLabelNames)) + return float64(storage.TooLongLabelNames.Load()) }) _ = metrics.NewGauge(`vm_too_long_label_values_total`, func() float64 { - return float64(atomic.LoadUint64(&storage.TooLongLabelValues)) + return float64(storage.TooLongLabelValues.Load()) }) ) diff --git a/lib/storage/block_stream_writer.go b/lib/storage/block_stream_writer.go index f674b13c3..3e305287d 100644 --- a/lib/storage/block_stream_writer.go +++ b/lib/storage/block_stream_writer.go @@ -131,8 +131,8 @@ func (bsw *blockStreamWriter) MustClose() { } // WriteExternalBlock writes b to bsw and updates ph and rowsMerged. -func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *uint64) { - atomic.AddUint64(rowsMerged, uint64(b.rowsCount())) +func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsMerged *atomic.Uint64) { + rowsMerged.Add(uint64(b.rowsCount())) b.deduplicateSamplesDuringMerge() headerData, timestampsData, valuesData := b.MarshalData(bsw.timestampsBlockOffset, bsw.valuesBlockOffset) @@ -141,8 +141,8 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM // The current timestamps block equals to the previous timestamps block. // Update headerData so it points to the previous timestamps block. This saves disk space. headerData, timestampsData, valuesData = b.MarshalData(bsw.prevTimestampsBlockOffset, bsw.valuesBlockOffset) - atomic.AddUint64(×tampsBlocksMerged, 1) - atomic.AddUint64(×tampsBytesSaved, uint64(len(timestampsData))) + timestampsBlocksMerged.Add(1) + timestampsBytesSaved.Add(uint64(len(timestampsData))) } if len(bsw.indexData)+len(headerData) > maxBlockSize { @@ -163,8 +163,8 @@ func (bsw *blockStreamWriter) WriteExternalBlock(b *Block, ph *partHeader, rowsM } var ( - timestampsBlocksMerged uint64 - timestampsBytesSaved uint64 + timestampsBlocksMerged atomic.Uint64 + timestampsBytesSaved atomic.Uint64 ) func updatePartHeader(b *Block, ph *partHeader) { diff --git a/lib/storage/block_stream_writer_timing_test.go b/lib/storage/block_stream_writer_timing_test.go index 80df38cd6..df74c68de 100644 --- a/lib/storage/block_stream_writer_timing_test.go +++ b/lib/storage/block_stream_writer_timing_test.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "sync/atomic" "testing" ) @@ -22,7 +23,7 @@ func BenchmarkBlockStreamWriterRowsBestCase(b *testing.B) { } func benchmarkBlockStreamWriter(b *testing.B, ebs []Block, rowsCount int, writeRows bool) { - var rowsMerged uint64 + var rowsMerged atomic.Uint64 b.ReportAllocs() b.SetBytes(int64(rowsCount)) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 884d0c26d..c81983d9f 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -75,21 +75,21 @@ type indexDB struct { // The number of missing MetricID -> TSID entries. // High rate for this value means corrupted indexDB. - missingTSIDsForMetricID uint64 + missingTSIDsForMetricID atomic.Uint64 // The number of calls for date range searches. - dateRangeSearchCalls uint64 + dateRangeSearchCalls atomic.Uint64 // The number of hits for date range searches. - dateRangeSearchHits uint64 + dateRangeSearchHits atomic.Uint64 // The number of calls for global search. - globalSearchCalls uint64 + globalSearchCalls atomic.Uint64 // missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries. // High rate may mean corrupted indexDB due to unclean shutdown. // The db must be automatically recovered after that. - missingMetricNamesForMetricID uint64 + missingMetricNamesForMetricID atomic.Uint64 // generation identifies the index generation ID // and is used for syncing items from different indexDBs @@ -216,20 +216,20 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.IndexDBRefCount += uint64(db.refCount.Load()) - m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID) + m.MissingTSIDsForMetricID += db.missingTSIDsForMetricID.Load() - m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls) - m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits) - m.GlobalSearchCalls += atomic.LoadUint64(&db.globalSearchCalls) + m.DateRangeSearchCalls += db.dateRangeSearchCalls.Load() + m.DateRangeSearchHits += db.dateRangeSearchHits.Load() + m.GlobalSearchCalls += db.globalSearchCalls.Load() - m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID) + m.MissingMetricNamesForMetricID += db.missingMetricNamesForMetricID.Load() - m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) - m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) + m.IndexBlocksWithMetricIDsProcessed = indexBlocksWithMetricIDsProcessed.Load() + m.IndexBlocksWithMetricIDsIncorrectOrder = indexBlocksWithMetricIDsIncorrectOrder.Load() m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex) - m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions) - m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions) + m.CompositeFilterSuccessConversions = compositeFilterSuccessConversions.Load() + m.CompositeFilterMissingConversions = compositeFilterMissingConversions.Load() db.tb.UpdateMetrics(&m.TableMetrics) db.doExtDB(func(extDB *indexDB) { @@ -374,7 +374,7 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione // isn't persisted to disk (it is very volatile because of tagFiltersKeyGen). prefix := ^uint64(0) if versioned { - prefix = atomic.LoadUint64(&tagFiltersKeyGen) + prefix = tagFiltersKeyGen.Load() } // Round start and end times to per-day granularity according to per-day inverted index. startDate := uint64(tr.MinTimestamp) / msecPerDay @@ -393,10 +393,10 @@ func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versione func invalidateTagFiltersCache() { // This function must be fast, since it is called each time new timeseries is added. - atomic.AddUint64(&tagFiltersKeyGen, 1) + tagFiltersKeyGen.Add(1) } -var tagFiltersKeyGen uint64 +var tagFiltersKeyGen atomic.Uint64 func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte { // Compress metricIDs, so they occupy less space in the cache. @@ -1474,7 +1474,7 @@ func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byt // Cannot find MetricName for the given metricID. This may be the case // when indexDB contains incomplete set of metricID -> metricName entries // after a snapshot or due to unflushed entries. - atomic.AddUint64(&db.missingMetricNamesForMetricID, 1) + db.missingMetricNamesForMetricID.Add(1) // Mark the metricID as deleted, so it will be created again when new data point // for the given time series will arrive. @@ -1754,7 +1754,7 @@ func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uin // This may be the case on incomplete indexDB // due to snapshot or due to unflushed entries. // Just increment errors counter and skip it for now. - atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1) + is.db.missingTSIDsForMetricID.Add(1) continue } is.db.putToMetricIDCache(metricID, tsid) @@ -2205,7 +2205,7 @@ func (is *indexSearch) updateMetricIDsForTagFilters(qt *querytracer.Tracer, metr // Slow path - fall back to search in the global inverted index. qt.Printf("cannot find metric ids in per-day index; fall back to global index") - atomic.AddUint64(&is.db.globalSearchCalls, 1) + is.db.globalSearchCalls.Add(1) m, err := is.getMetricIDsForDateAndFilters(qt, 0, tfs, maxMetrics) if err != nil { if errors.Is(err, errFallbackToGlobalSearch) { @@ -2395,7 +2395,7 @@ var errFallbackToGlobalSearch = errors.New("fall back from per-day index search const maxDaysForPerDaySearch = 40 func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error { - atomic.AddUint64(&is.db.dateRangeSearchCalls, 1) + is.db.dateRangeSearchCalls.Add(1) minDate := uint64(tr.MinTimestamp) / msecPerDay maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch { @@ -2409,7 +2409,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, return err } metricIDs.UnionMayOwn(m) - atomic.AddUint64(&is.db.dateRangeSearchHits, 1) + is.db.dateRangeSearchHits.Add(1) return nil } @@ -2451,7 +2451,7 @@ func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, if errGlobal != nil { return errGlobal } - atomic.AddUint64(&is.db.dateRangeSearchHits, 1) + is.db.dateRangeSearchHits.Add(1) return nil } @@ -2956,13 +2956,17 @@ func generateUniqueMetricID() uint64 { // It is expected that metricIDs returned from this function must be dense. // If they will be sparse, then this may hurt metric_ids intersection // performance with uint64set.Set. - return atomic.AddUint64(&nextUniqueMetricID, 1) + return nextUniqueMetricID.Add(1) } // This number mustn't go backwards on restarts, otherwise metricID // collisions are possible. So don't change time on the server // between VictoriaMetrics restarts. -var nextUniqueMetricID = uint64(time.Now().UnixNano()) +var nextUniqueMetricID = func() *atomic.Uint64 { + var n atomic.Uint64 + n.Store(uint64(time.Now().UnixNano())) + return &n +}() func marshalCommonPrefix(dst []byte, nsPrefix byte) []byte { dst = append(dst, nsPrefix) @@ -3226,7 +3230,7 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi // Leave the original items unmerged, so they can be merged next time. // This case should be quite rare - if multiple data points are simultaneously inserted // into the same new time series from multiple concurrent goroutines. - atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1) + indexBlocksWithMetricIDsIncorrectOrder.Add(1) dstData = append(dstData[:0], tmm.dataCopy...) dstItems = append(dstItems[:0], tmm.itemsCopy...) if !checkItemsSorted(dstData, dstItems) { @@ -3234,12 +3238,12 @@ func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefi } } putTagToMetricIDsRowsMerger(tmm) - atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1) + indexBlocksWithMetricIDsProcessed.Add(1) return dstData, dstItems } -var indexBlocksWithMetricIDsIncorrectOrder uint64 -var indexBlocksWithMetricIDsProcessed uint64 +var indexBlocksWithMetricIDsIncorrectOrder atomic.Uint64 +var indexBlocksWithMetricIDsProcessed atomic.Uint64 func checkItemsSorted(data []byte, items []mergeset.Item) bool { if len(items) == 0 { diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 788afa4ab..5bee81a12 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -1480,7 +1480,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { } // check new series were registered in indexDB - added := atomic.LoadUint64(&db.s.newTimeseriesCreated) + added := db.s.newTimeseriesCreated.Load() if added != metricRowsN { t.Fatalf("expected indexDB to contain %d rows; got %d", metricRowsN, added) } diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 8759c0992..4eb0e10cd 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -15,7 +15,7 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, s *Storage, retentionDeadline int64, - rowsMerged, rowsDeleted *uint64) error { + rowsMerged, rowsDeleted *atomic.Uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) @@ -38,7 +38,7 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") -func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *uint64) error { +func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, s *Storage, rowsMerged, rowsDeleted *atomic.Uint64) error { dmis := s.getDeletedMetricIDs() pendingBlockIsEmpty := true pendingBlock := getBlock() @@ -54,13 +54,13 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc b := bsm.Block if dmis.Has(b.bh.TSID.MetricID) { // Skip blocks for deleted metrics. - atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) + rowsDeleted.Add(uint64(b.bh.RowsCount)) continue } retentionDeadline := bsm.getRetentionDeadline(&b.bh) if b.bh.MaxTimestamp < retentionDeadline { // Skip blocks out of the given retention. - atomic.AddUint64(rowsDeleted, uint64(b.bh.RowsCount)) + rowsDeleted.Add(uint64(b.bh.RowsCount)) continue } if pendingBlockIsEmpty { @@ -132,7 +132,7 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc } // mergeBlocks merges ib1 and ib2 to ob. -func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint64) { +func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) { ib1.assertMergeable(ib2) ib1.assertUnmarshaled() ib2.assertUnmarshaled() @@ -177,7 +177,7 @@ func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint } } -func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) { +func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *atomic.Uint64) { if b.bh.MinTimestamp >= retentionDeadline { // Fast path - the block contains only samples with timestamps bigger than retentionDeadline. return @@ -189,7 +189,7 @@ func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted nextIdx++ } if n := nextIdx - nextIdxOrig; n > 0 { - atomic.AddUint64(rowsDeleted, uint64(n)) + rowsDeleted.Add(uint64(n)) b.nextIdx = nextIdx } } diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index d34e5c0f0..8ca3bdec7 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -3,6 +3,7 @@ package storage import ( "errors" "math/rand" + "sync/atomic" "testing" ) @@ -371,18 +372,18 @@ func TestMergeForciblyStop(t *testing.T) { var bsw blockStreamWriter bsw.MustInitFromInmemoryPart(&mp, -5) ch := make(chan struct{}) - var rowsMerged, rowsDeleted uint64 + var rowsMerged, rowsDeleted atomic.Uint64 close(ch) strg := newTestStorage() if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, strg, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } - if rowsMerged != 0 { - t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, 0) + if n := rowsMerged.Load(); n != 0 { + t.Fatalf("unexpected rowsMerged; got %d; want %d", n, 0) } - if rowsDeleted != 0 { - t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0) + if n := rowsDeleted.Load(); n != 0 { + t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0) } stopTestStorage(strg) } @@ -396,7 +397,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.MustInitFromInmemoryPart(&mp, -5) strg := newTestStorage() - var rowsMerged, rowsDeleted uint64 + var rowsMerged, rowsDeleted atomic.Uint64 if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, strg, 0, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } @@ -406,11 +407,11 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc if mp.ph.RowsCount != uint64(expectedRowsCount) { t.Fatalf("unexpected rows count in partHeader; got %d; want %d", mp.ph.RowsCount, expectedRowsCount) } - if rowsMerged != mp.ph.RowsCount { - t.Fatalf("unexpected rowsMerged; got %d; want %d", rowsMerged, mp.ph.RowsCount) + if n := rowsMerged.Load(); n != mp.ph.RowsCount { + t.Fatalf("unexpected rowsMerged; got %d; want %d", n, mp.ph.RowsCount) } - if rowsDeleted != 0 { - t.Fatalf("unexpected rowsDeleted; got %d; want %d", rowsDeleted, 0) + if n := rowsDeleted.Load(); n != 0 { + t.Fatalf("unexpected rowsDeleted; got %d; want %d", n, 0) } if mp.ph.MinTimestamp != expectedMinTimestamp { t.Fatalf("unexpected MinTimestamp in partHeader; got %d; want %d", mp.ph.MinTimestamp, expectedMinTimestamp) diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 324812d9f..1ece9ab14 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "math/rand" + "sync/atomic" "testing" ) @@ -23,7 +24,7 @@ func BenchmarkMergeBlockStreamsFourSourcesBestCase(b *testing.B) { } func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop int64) { - var rowsMerged, rowsDeleted uint64 + var rowsMerged, rowsDeleted atomic.Uint64 strg := newTestStorage() b.ReportAllocs() diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index c44291322..903dbd4ce 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -516,7 +516,7 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { } label := &labels[i] if len(label.Name) > maxLabelNameLen { - atomic.AddUint64(&TooLongLabelNames, 1) + TooLongLabelNames.Add(1) label.Name = label.Name[:maxLabelNameLen] } if len(label.Value) > maxLabelValueLen { @@ -554,17 +554,17 @@ func MarshalMetricNameRaw(dst []byte, labels []prompb.Label) []byte { var ( // MetricsWithDroppedLabels is the number of metrics with at least a single dropped label - MetricsWithDroppedLabels uint64 + MetricsWithDroppedLabels atomic.Uint64 // TooLongLabelNames is the number of too long label names - TooLongLabelNames uint64 + TooLongLabelNames atomic.Uint64 // TooLongLabelValues is the number of too long label values - TooLongLabelValues uint64 + TooLongLabelValues atomic.Uint64 ) func trackDroppedLabels(labels, droppedLabels []prompb.Label) { - atomic.AddUint64(&MetricsWithDroppedLabels, 1) + MetricsWithDroppedLabels.Add(1) select { case <-droppedLabelsLogTicker.C: // Do not call logger.WithThrottler() here, since this will result in increased CPU usage @@ -577,7 +577,7 @@ func trackDroppedLabels(labels, droppedLabels []prompb.Label) { } func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) { - atomic.AddUint64(&TooLongLabelValues, 1) + TooLongLabelValues.Add(1) select { case <-truncatedLabelsLogTicker.C: // Do not call logger.WithThrottler() here, since this will result in increased CPU usage diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 32d6fa988..e1f0c9c34 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -75,26 +75,23 @@ const maxRawRowsPerShard = (8 << 20) / int(unsafe.Sizeof(rawRow{})) // partition represents a partition. type partition struct { - // Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 + activeInmemoryMerges atomic.Int64 + activeSmallMerges atomic.Int64 + activeBigMerges atomic.Int64 - activeInmemoryMerges uint64 - activeSmallMerges uint64 - activeBigMerges uint64 + inmemoryMergesCount atomic.Uint64 + smallMergesCount atomic.Uint64 + bigMergesCount atomic.Uint64 - inmemoryMergesCount uint64 - smallMergesCount uint64 - bigMergesCount uint64 + inmemoryRowsMerged atomic.Uint64 + smallRowsMerged atomic.Uint64 + bigRowsMerged atomic.Uint64 - inmemoryRowsMerged uint64 - smallRowsMerged uint64 - bigRowsMerged uint64 + inmemoryRowsDeleted atomic.Uint64 + smallRowsDeleted atomic.Uint64 + bigRowsDeleted atomic.Uint64 - inmemoryRowsDeleted uint64 - smallRowsDeleted uint64 - bigRowsDeleted uint64 - - mergeIdx uint64 + mergeIdx atomic.Uint64 // the path to directory with smallParts. smallPartsPath string @@ -274,12 +271,12 @@ func mustOpenPartition(smallPartsPath, bigPartsPath string, s *Storage) *partiti func newPartition(name, smallPartsPath, bigPartsPath string, s *Storage) *partition { p := &partition{ name: name, - mergeIdx: uint64(time.Now().UnixNano()), smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, s: s, stopCh: make(chan struct{}), } + p.mergeIdx.Store(uint64(time.Now().UnixNano())) p.rawRows.init() return p } @@ -376,21 +373,21 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.IndexBlocksCacheRequests = ibCache.Requests() m.IndexBlocksCacheMisses = ibCache.Misses() - m.ActiveInmemoryMerges += atomic.LoadUint64(&pt.activeInmemoryMerges) - m.ActiveSmallMerges += atomic.LoadUint64(&pt.activeSmallMerges) - m.ActiveBigMerges += atomic.LoadUint64(&pt.activeBigMerges) + m.ActiveInmemoryMerges += uint64(pt.activeInmemoryMerges.Load()) + m.ActiveSmallMerges += uint64(pt.activeSmallMerges.Load()) + m.ActiveBigMerges += uint64(pt.activeBigMerges.Load()) - m.InmemoryMergesCount += atomic.LoadUint64(&pt.inmemoryMergesCount) - m.SmallMergesCount += atomic.LoadUint64(&pt.smallMergesCount) - m.BigMergesCount += atomic.LoadUint64(&pt.bigMergesCount) + m.InmemoryMergesCount += pt.inmemoryMergesCount.Load() + m.SmallMergesCount += pt.smallMergesCount.Load() + m.BigMergesCount += pt.bigMergesCount.Load() - m.InmemoryRowsMerged += atomic.LoadUint64(&pt.inmemoryRowsMerged) - m.SmallRowsMerged += atomic.LoadUint64(&pt.smallRowsMerged) - m.BigRowsMerged += atomic.LoadUint64(&pt.bigRowsMerged) + m.InmemoryRowsMerged += pt.inmemoryRowsMerged.Load() + m.SmallRowsMerged += pt.smallRowsMerged.Load() + m.BigRowsMerged += pt.bigRowsMerged.Load() - m.InmemoryRowsDeleted += atomic.LoadUint64(&pt.inmemoryRowsDeleted) - m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) - m.BigRowsDeleted += atomic.LoadUint64(&pt.bigRowsDeleted) + m.InmemoryRowsDeleted += pt.inmemoryRowsDeleted.Load() + m.SmallRowsDeleted += pt.smallRowsDeleted.Load() + m.BigRowsDeleted += pt.bigRowsDeleted.Load() } // AddRows adds the given rows to the partition pt. @@ -1531,10 +1528,10 @@ func mustOpenBlockStreamReaders(pws []*partWrapper) []*blockStreamReader { func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWriter, bsrs []*blockStreamReader, dstPartType partType, stopCh <-chan struct{}) (*partHeader, error) { var ph partHeader - var rowsMerged *uint64 - var rowsDeleted *uint64 - var mergesCount *uint64 - var activeMerges *uint64 + var rowsMerged *atomic.Uint64 + var rowsDeleted *atomic.Uint64 + var mergesCount *atomic.Uint64 + var activeMerges *atomic.Int64 switch dstPartType { case partInmemory: rowsMerged = &pt.inmemoryRowsMerged @@ -1555,10 +1552,10 @@ func (pt *partition) mergePartsInternal(dstPartPath string, bsw *blockStreamWrit logger.Panicf("BUG: unknown partType=%d", dstPartType) } retentionDeadline := timestampFromTime(time.Now()) - pt.s.retentionMsecs - atomic.AddUint64(activeMerges, 1) + activeMerges.Add(1) err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, pt.s, retentionDeadline, rowsMerged, rowsDeleted) - atomic.AddUint64(activeMerges, ^uint64(0)) - atomic.AddUint64(mergesCount, 1) + activeMerges.Add(-1) + mergesCount.Add(1) if err != nil { return nil, fmt.Errorf("cannot merge %d parts to %s: %w", len(bsrs), dstPartPath, err) } @@ -1674,7 +1671,7 @@ func getCompressLevel(rowsPerBlock float64) int { } func (pt *partition) nextMergeIdx() uint64 { - return atomic.AddUint64(&pt.mergeIdx, 1) + return pt.mergeIdx.Add(1) } func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { @@ -1712,21 +1709,21 @@ func (pt *partition) removeStaleParts() { pt.partsLock.Lock() for _, pw := range pt.inmemoryParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { - atomic.AddUint64(&pt.inmemoryRowsDeleted, pw.p.ph.RowsCount) + pt.inmemoryRowsDeleted.Add(pw.p.ph.RowsCount) pw.isInMerge = true pws = append(pws, pw) } } for _, pw := range pt.smallParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { - atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) + pt.smallRowsDeleted.Add(pw.p.ph.RowsCount) pw.isInMerge = true pws = append(pws, pw) } } for _, pw := range pt.bigParts { if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { - atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) + pt.bigRowsDeleted.Add(pw.p.ph.RowsCount) pw.isInMerge = true pws = append(pws, pw) } diff --git a/lib/storage/raw_row.go b/lib/storage/raw_row.go index 0f267da03..f59a8c8f7 100644 --- a/lib/storage/raw_row.go +++ b/lib/storage/raw_row.go @@ -3,6 +3,7 @@ package storage import ( "sort" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -102,7 +103,7 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR // Group rows into blocks. var scale int16 - var rowsMerged uint64 + var rowsMerged atomic.Uint64 r := &rows[0] tsid := &r.TSID precisionBits := r.PrecisionBits @@ -129,8 +130,8 @@ func (rrm *rawRowsMarshaler) marshalToInmemoryPart(mp *inmemoryPart, rows []rawR rrm.auxValues, scale = decimal.AppendFloatToDecimal(rrm.auxValues[:0], rrm.auxFloatValues) tmpBlock.Init(tsid, rrm.auxTimestamps, rrm.auxValues, scale, precisionBits) rrm.bsw.WriteExternalBlock(tmpBlock, ph, &rowsMerged) - if rowsMerged != uint64(len(rows)) { - logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", rowsMerged, len(rows)) + if n := rowsMerged.Load(); n != uint64(len(rows)) { + logger.Panicf("BUG: unexpected rowsMerged; got %d; want %d", n, len(rows)) } rrm.bsw.MustClose() } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 1d571d80a..6023031d7 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -40,27 +40,25 @@ const ( // Storage represents TSDB storage. type Storage struct { - // Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 . - tooSmallTimestampRows uint64 - tooBigTimestampRows uint64 + tooSmallTimestampRows atomic.Uint64 + tooBigTimestampRows atomic.Uint64 - timeseriesRepopulated uint64 - timeseriesPreCreated uint64 - newTimeseriesCreated uint64 - slowRowInserts uint64 - slowPerDayIndexInserts uint64 - slowMetricNameLoads uint64 + timeseriesRepopulated atomic.Uint64 + timeseriesPreCreated atomic.Uint64 + newTimeseriesCreated atomic.Uint64 + slowRowInserts atomic.Uint64 + slowPerDayIndexInserts atomic.Uint64 + slowMetricNameLoads atomic.Uint64 - hourlySeriesLimitRowsDropped uint64 - dailySeriesLimitRowsDropped uint64 + hourlySeriesLimitRowsDropped atomic.Uint64 + dailySeriesLimitRowsDropped atomic.Uint64 // nextRotationTimestamp is a timestamp in seconds of the next indexdb rotation. // // It is used for gradual pre-population of the idbNext during the last hour before the indexdb rotation. // in order to reduce spikes in CPU and disk IO usage just after the rotiation. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 - nextRotationTimestamp int64 + nextRotationTimestamp atomic.Int64 path string cachePath string @@ -124,7 +122,7 @@ type Storage struct { prefetchedMetricIDs *uint64set.Set // prefetchedMetricIDsDeadline is used for periodic reset of prefetchedMetricIDs in order to limit its size under high rate of creating new series. - prefetchedMetricIDsDeadline uint64 + prefetchedMetricIDsDeadline atomic.Uint64 stop chan struct{} @@ -245,7 +243,7 @@ func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxD nowSecs := int64(fasttime.UnixTimestamp()) retentionSecs := retention.Milliseconds() / 1000 // not .Seconds() because unnecessary float64 conversion nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionSecs, retentionTimezoneOffsetSecs) - atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp) + s.nextRotationTimestamp.Store(nextRotationTimestamp) // Load nextDayMetricIDs cache date := fasttime.UnixDate() @@ -545,34 +543,34 @@ func (m *Metrics) Reset() { // UpdateMetrics updates m with metrics from s. func (s *Storage) UpdateMetrics(m *Metrics) { - m.RowsAddedTotal = atomic.LoadUint64(&rowsAddedTotal) + m.RowsAddedTotal = rowsAddedTotal.Load() m.DedupsDuringMerge = dedupsDuringMerge.Load() m.SnapshotsCount += uint64(s.mustGetSnapshotsCount()) - m.TooSmallTimestampRows += atomic.LoadUint64(&s.tooSmallTimestampRows) - m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows) + m.TooSmallTimestampRows += s.tooSmallTimestampRows.Load() + m.TooBigTimestampRows += s.tooBigTimestampRows.Load() - m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated) - m.TimeseriesPreCreated += atomic.LoadUint64(&s.timeseriesPreCreated) - m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated) - m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) - m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts) - m.SlowMetricNameLoads += atomic.LoadUint64(&s.slowMetricNameLoads) + m.TimeseriesRepopulated += s.timeseriesRepopulated.Load() + m.TimeseriesPreCreated += s.timeseriesPreCreated.Load() + m.NewTimeseriesCreated += s.newTimeseriesCreated.Load() + m.SlowRowInserts += s.slowRowInserts.Load() + m.SlowPerDayIndexInserts += s.slowPerDayIndexInserts.Load() + m.SlowMetricNameLoads += s.slowMetricNameLoads.Load() if sl := s.hourlySeriesLimiter; sl != nil { - m.HourlySeriesLimitRowsDropped += atomic.LoadUint64(&s.hourlySeriesLimitRowsDropped) + m.HourlySeriesLimitRowsDropped += s.hourlySeriesLimitRowsDropped.Load() m.HourlySeriesLimitMaxSeries += uint64(sl.MaxItems()) m.HourlySeriesLimitCurrentSeries += uint64(sl.CurrentItems()) } if sl := s.dailySeriesLimiter; sl != nil { - m.DailySeriesLimitRowsDropped += atomic.LoadUint64(&s.dailySeriesLimitRowsDropped) + m.DailySeriesLimitRowsDropped += s.dailySeriesLimitRowsDropped.Load() m.DailySeriesLimitMaxSeries += uint64(sl.MaxItems()) m.DailySeriesLimitCurrentSeries += uint64(sl.CurrentItems()) } - m.TimestampsBlocksMerged = atomic.LoadUint64(×tampsBlocksMerged) - m.TimestampsBytesSaved = atomic.LoadUint64(×tampsBytesSaved) + m.TimestampsBlocksMerged = timestampsBlocksMerged.Load() + m.TimestampsBytesSaved = timestampsBytesSaved.Load() var cs fastcache.Stats s.tsidCache.UpdateStats(&cs) @@ -603,8 +601,8 @@ func (s *Storage) UpdateMetrics(m *Metrics) { m.DateMetricIDCacheSize += uint64(s.dateMetricIDCache.EntriesCount()) m.DateMetricIDCacheSizeBytes += uint64(s.dateMetricIDCache.SizeBytes()) - m.DateMetricIDCacheSyncsCount += atomic.LoadUint64(&s.dateMetricIDCache.syncsCount) - m.DateMetricIDCacheResetsCount += atomic.LoadUint64(&s.dateMetricIDCache.resetsCount) + m.DateMetricIDCacheSyncsCount += s.dateMetricIDCache.syncsCount.Load() + m.DateMetricIDCacheResetsCount += s.dateMetricIDCache.resetsCount.Load() hmCurr := s.currHourMetricIDs.Load() hmPrev := s.prevHourMetricIDs.Load() @@ -637,7 +635,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) { } func (s *Storage) nextRetentionSeconds() int64 { - return atomic.LoadInt64(&s.nextRotationTimestamp) - int64(fasttime.UnixTimestamp()) + return s.nextRotationTimestamp.Load() - int64(fasttime.UnixTimestamp()) } // SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path @@ -783,7 +781,7 @@ func (s *Storage) mustRotateIndexDB(currentTime time.Time) { // Update nextRotationTimestamp nextRotationTimestamp := currentTime.Unix() + s.retentionMsecs/1000 - atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp) + s.nextRotationTimestamp.Store(nextRotationTimestamp) // Set idbNext to idbNew idbNext := s.idbNext.Load() @@ -1191,7 +1189,7 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin qt.Printf("skip pre-fetching metric names for low number of missing metric ids=%d", len(metricIDs)) return nil } - atomic.AddUint64(&s.slowMetricNameLoads, uint64(len(metricIDs))) + s.slowMetricNameLoads.Add(uint64(len(metricIDs))) // Pre-fetch metricIDs. var missingMetricIDs []uint64 @@ -1232,12 +1230,12 @@ func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uin // Store the pre-fetched metricIDs, so they aren't pre-fetched next time. s.prefetchedMetricIDsLock.Lock() - if fasttime.UnixTimestamp() > atomic.LoadUint64(&s.prefetchedMetricIDsDeadline) { + if fasttime.UnixTimestamp() > s.prefetchedMetricIDsDeadline.Load() { // Periodically reset the prefetchedMetricIDs in order to limit its size. s.prefetchedMetricIDs = &uint64set.Set{} d := timeutil.AddJitterToDuration(time.Second * 20 * 60) metricIDsDeadline := fasttime.UnixTimestamp() + uint64(d.Seconds()) - atomic.StoreUint64(&s.prefetchedMetricIDsDeadline, metricIDsDeadline) + s.prefetchedMetricIDsDeadline.Store(metricIDsDeadline) } s.prefetchedMetricIDs.AddMulti(metricIDs) s.prefetchedMetricIDsLock.Unlock() @@ -1547,7 +1545,7 @@ func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error { return s.tb.ForceMergePartitions(partitionNamePrefix) } -var rowsAddedTotal uint64 +var rowsAddedTotal atomic.Uint64 // AddRows adds the given mrs to s. // @@ -1576,7 +1574,7 @@ func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { } continue } - atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock))) + rowsAddedTotal.Add(uint64(len(mrsBlock))) } putMetricRowsInsertCtx(ic) @@ -1709,7 +1707,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) { s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date) } - atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + s.timeseriesRepopulated.Add(seriesRepopulated) // There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called. // So it is OK to register metric names in blocking manner after indexdb rotation. @@ -1766,7 +1764,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci "probably you need updating -retentionPeriod command-line flag; metricName: %s", mr.Timestamp, minTimestamp, metricName) } - atomic.AddUint64(&s.tooSmallTimestampRows, 1) + s.tooSmallTimestampRows.Add(1) continue } if mr.Timestamp > maxTimestamp { @@ -1776,7 +1774,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowed timestamp is %d; metricName: %s", mr.Timestamp, maxTimestamp, metricName) } - atomic.AddUint64(&s.tooBigTimestampRows, 1) + s.tooBigTimestampRows.Add(1) continue } dstMrs[j] = mr @@ -1900,9 +1898,9 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci } } - atomic.AddUint64(&s.slowRowInserts, slowInsertsCount) - atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount) - atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated) + s.slowRowInserts.Add(slowInsertsCount) + s.newTimeseriesCreated.Add(newSeriesCount) + s.timeseriesRepopulated.Add(seriesRepopulated) dstMrs = dstMrs[:j] rows = rows[:j] @@ -1956,12 +1954,12 @@ func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool { if sl := s.hourlySeriesLimiter; sl != nil && !sl.Add(metricID) { - atomic.AddUint64(&s.hourlySeriesLimitRowsDropped, 1) + s.hourlySeriesLimitRowsDropped.Add(1) logSkippedSeries(metricNameRaw, "-storage.maxHourlySeries", sl.MaxItems()) return false } if sl := s.dailySeriesLimiter; sl != nil && !sl.Add(metricID) { - atomic.AddUint64(&s.dailySeriesLimitRowsDropped, 1) + s.dailySeriesLimitRowsDropped.Add(1) logSkippedSeries(metricNameRaw, "-storage.maxDailySeries", sl.MaxItems()) return false } @@ -2056,7 +2054,7 @@ func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error { s.putSeriesToCache(metricNameRaw, &genTSID, date) timeseriesPreCreated++ } - atomic.AddUint64(&s.timeseriesPreCreated, timeseriesPreCreated) + s.timeseriesPreCreated.Add(timeseriesPreCreated) return firstError } @@ -2163,7 +2161,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error { // Slow path - add new (date, metricID) entries to indexDB. - atomic.AddUint64(&s.slowPerDayIndexInserts, uint64(len(pendingDateMetricIDs))) + s.slowPerDayIndexInserts.Add(uint64(len(pendingDateMetricIDs))) // Sort pendingDateMetricIDs by (date, metricID) in order to speed up `is` search in the loop below. sort.Slice(pendingDateMetricIDs, func(i, j int) bool { a := pendingDateMetricIDs[i] @@ -2218,9 +2216,8 @@ func fastHashUint64(x uint64) uint64 { // // It should be faster than map[date]*uint64set.Set on multicore systems. type dateMetricIDCache struct { - // 64-bit counters must be at the top of the structure to be properly aligned on 32-bit arches. - syncsCount uint64 - resetsCount uint64 + syncsCount atomic.Uint64 + resetsCount atomic.Uint64 // Contains immutable map byDate atomic.Pointer[byDateMetricIDMap] @@ -2248,7 +2245,7 @@ func (dmc *dateMetricIDCache) resetLocked() { dmc.byDateMutable = newByDateMetricIDMap() dmc.slowHits = 0 - atomic.AddUint64(&dmc.resetsCount, 1) + dmc.resetsCount.Add(1) } func (dmc *dateMetricIDCache) EntriesCount() int { @@ -2395,7 +2392,7 @@ func (dmc *dateMetricIDCache) syncLocked() { dmc.byDate.Store(dmc.byDateMutable) dmc.byDateMutable = newByDateMetricIDMap() - atomic.AddUint64(&dmc.syncsCount, 1) + dmc.syncsCount.Add(1) if dmc.SizeBytes() > uint64(memory.Allowed())/256 { dmc.resetLocked() diff --git a/lib/storage/tag_filters.go b/lib/storage/tag_filters.go index 3ef0acc61..b7b70958c 100644 --- a/lib/storage/tag_filters.go +++ b/lib/storage/tag_filters.go @@ -58,7 +58,7 @@ func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters { // then it is impossible to construct composite tag filter. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2238 if len(names) == 0 || !hasPositiveFilter { - atomic.AddUint64(&compositeFilterMissingConversions, 1) + compositeFilterMissingConversions.Add(1) return []*TagFilters{tfs} } @@ -117,20 +117,20 @@ func convertToCompositeTagFilters(tfs *TagFilters) []*TagFilters { if compositeFilters == 0 { // Cannot use tfsNew, since it doesn't contain composite filters, e.g. it may match broader set of series. // Fall back to the original tfs. - atomic.AddUint64(&compositeFilterMissingConversions, 1) + compositeFilterMissingConversions.Add(1) return []*TagFilters{tfs} } tfsCompiled := NewTagFilters() tfsCompiled.tfs = tfsNew tfssCompiled = append(tfssCompiled, tfsCompiled) } - atomic.AddUint64(&compositeFilterSuccessConversions, 1) + compositeFilterSuccessConversions.Add(1) return tfssCompiled } var ( - compositeFilterSuccessConversions uint64 - compositeFilterMissingConversions uint64 + compositeFilterSuccessConversions atomic.Uint64 + compositeFilterMissingConversions atomic.Uint64 ) // TagFilters represents filters used for filtering tags.