diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index af505ac75..3462df34b 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -626,6 +626,9 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) { metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/metricName"}`, m.MetricNameCacheCollisions) metrics.WriteGaugeUint64(w, `vm_next_retention_seconds`, m.NextRetentionSeconds) + + metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions) + metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize) } func jsonResponseError(w http.ResponseWriter, err error) { diff --git a/lib/storage/partition.go b/lib/storage/partition.go index d098081e4..598042f3f 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -91,6 +91,8 @@ type partition struct { smallRowsDeleted atomic.Uint64 bigRowsDeleted atomic.Uint64 + isDedupScheduled atomic.Bool + mergeIdx atomic.Uint64 // the path to directory with smallParts. @@ -326,6 +328,9 @@ type partitionMetrics struct { InmemoryPartsRefCount uint64 SmallPartsRefCount uint64 BigPartsRefCount uint64 + + ScheduledDownsamplingPartitions uint64 + ScheduledDownsamplingPartitionsSize uint64 } // TotalRowsCount returns total number of rows in tm. @@ -339,12 +344,20 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { pt.partsLock.Lock() + isDedupScheduled := pt.isDedupScheduled.Load() + if isDedupScheduled { + m.ScheduledDownsamplingPartitions++ + } + for _, pw := range pt.inmemoryParts { p := pw.p m.InmemoryRowsCount += p.ph.RowsCount m.InmemoryBlocksCount += p.ph.BlocksCount m.InmemorySizeBytes += p.size m.InmemoryPartsRefCount += uint64(pw.refCount.Load()) + if isDedupScheduled { + m.ScheduledDownsamplingPartitionsSize += p.size + } } for _, pw := range pt.smallParts { p := pw.p @@ -352,6 +365,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallBlocksCount += p.ph.BlocksCount m.SmallSizeBytes += p.size m.SmallPartsRefCount += uint64(pw.refCount.Load()) + if isDedupScheduled { + m.ScheduledDownsamplingPartitionsSize += p.size + } } for _, pw := range pt.bigParts { p := pw.p @@ -359,6 +375,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.BigBlocksCount += p.ph.BlocksCount m.BigSizeBytes += p.size m.BigPartsRefCount += uint64(pw.refCount.Load()) + if isDedupScheduled { + m.ScheduledDownsamplingPartitionsSize += p.size + } } m.InmemoryPartsCount += uint64(len(pt.inmemoryParts)) diff --git a/lib/storage/table.go b/lib/storage/table.go index 7f1234df4..34d92fcab 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -442,15 +442,21 @@ func (tb *table) finalDedupWatcher() { defer tb.PutPartitions(ptws) timestamp := timestampFromTime(time.Now()) currentPartitionName := timestampToPartitionName(timestamp) + var ptwsToDedup []*partitionWrapper for _, ptw := range ptws { if ptw.pt.name == currentPartitionName || !ptw.pt.isFinalDedupNeeded() { // Do not run final dedup for the current month. continue } + // mark partition with final deduplication marker + ptw.pt.isDedupScheduled.Store(true) + ptwsToDedup = append(ptwsToDedup, ptw) + } + for _, ptw := range ptwsToDedup { if err := ptw.pt.runFinalDedup(); err != nil { logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err) - continue } + ptw.pt.isDedupScheduled.Store(false) } } d := timeutil.AddJitterToDuration(time.Hour)