lib/storage: adds metrics for downsampling (#382)

* lib/storage: adds metrics for downsampling
vm_downsampling_partitions_scheduled - shows the number of parts, that must be downsampled
vm_downsampling_partitions_scheduled_size_bytes - shows total size in bytes for parts, the must be donwsampled

These two metrics answer the questions - is downsampling running? how many parts scheduled for downsampling and how many of them currently downsampled? Storage space that it occupies.

https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2612

* wip

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2022-06-15 17:37:52 +02:00 committed by Aliaksandr Valialkin
parent e79b05b4ab
commit a05303eaa0
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 29 additions and 1 deletions

View file

@ -626,6 +626,9 @@ func writeStorageMetrics(w io.Writer, strg *storage.Storage) {
metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/metricName"}`, m.MetricNameCacheCollisions) metrics.WriteCounterUint64(w, `vm_cache_collisions_total{type="storage/metricName"}`, m.MetricNameCacheCollisions)
metrics.WriteGaugeUint64(w, `vm_next_retention_seconds`, m.NextRetentionSeconds) metrics.WriteGaugeUint64(w, `vm_next_retention_seconds`, m.NextRetentionSeconds)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled`, tm.ScheduledDownsamplingPartitions)
metrics.WriteGaugeUint64(w, `vm_downsampling_partitions_scheduled_size_bytes`, tm.ScheduledDownsamplingPartitionsSize)
} }
func jsonResponseError(w http.ResponseWriter, err error) { func jsonResponseError(w http.ResponseWriter, err error) {

View file

@ -91,6 +91,8 @@ type partition struct {
smallRowsDeleted atomic.Uint64 smallRowsDeleted atomic.Uint64
bigRowsDeleted atomic.Uint64 bigRowsDeleted atomic.Uint64
isDedupScheduled atomic.Bool
mergeIdx atomic.Uint64 mergeIdx atomic.Uint64
// the path to directory with smallParts. // the path to directory with smallParts.
@ -326,6 +328,9 @@ type partitionMetrics struct {
InmemoryPartsRefCount uint64 InmemoryPartsRefCount uint64
SmallPartsRefCount uint64 SmallPartsRefCount uint64
BigPartsRefCount uint64 BigPartsRefCount uint64
ScheduledDownsamplingPartitions uint64
ScheduledDownsamplingPartitionsSize uint64
} }
// TotalRowsCount returns total number of rows in tm. // TotalRowsCount returns total number of rows in tm.
@ -339,12 +344,20 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
pt.partsLock.Lock() pt.partsLock.Lock()
isDedupScheduled := pt.isDedupScheduled.Load()
if isDedupScheduled {
m.ScheduledDownsamplingPartitions++
}
for _, pw := range pt.inmemoryParts { for _, pw := range pt.inmemoryParts {
p := pw.p p := pw.p
m.InmemoryRowsCount += p.ph.RowsCount m.InmemoryRowsCount += p.ph.RowsCount
m.InmemoryBlocksCount += p.ph.BlocksCount m.InmemoryBlocksCount += p.ph.BlocksCount
m.InmemorySizeBytes += p.size m.InmemorySizeBytes += p.size
m.InmemoryPartsRefCount += uint64(pw.refCount.Load()) m.InmemoryPartsRefCount += uint64(pw.refCount.Load())
if isDedupScheduled {
m.ScheduledDownsamplingPartitionsSize += p.size
}
} }
for _, pw := range pt.smallParts { for _, pw := range pt.smallParts {
p := pw.p p := pw.p
@ -352,6 +365,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.SmallBlocksCount += p.ph.BlocksCount m.SmallBlocksCount += p.ph.BlocksCount
m.SmallSizeBytes += p.size m.SmallSizeBytes += p.size
m.SmallPartsRefCount += uint64(pw.refCount.Load()) m.SmallPartsRefCount += uint64(pw.refCount.Load())
if isDedupScheduled {
m.ScheduledDownsamplingPartitionsSize += p.size
}
} }
for _, pw := range pt.bigParts { for _, pw := range pt.bigParts {
p := pw.p p := pw.p
@ -359,6 +375,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
m.BigBlocksCount += p.ph.BlocksCount m.BigBlocksCount += p.ph.BlocksCount
m.BigSizeBytes += p.size m.BigSizeBytes += p.size
m.BigPartsRefCount += uint64(pw.refCount.Load()) m.BigPartsRefCount += uint64(pw.refCount.Load())
if isDedupScheduled {
m.ScheduledDownsamplingPartitionsSize += p.size
}
} }
m.InmemoryPartsCount += uint64(len(pt.inmemoryParts)) m.InmemoryPartsCount += uint64(len(pt.inmemoryParts))

View file

@ -442,15 +442,21 @@ func (tb *table) finalDedupWatcher() {
defer tb.PutPartitions(ptws) defer tb.PutPartitions(ptws)
timestamp := timestampFromTime(time.Now()) timestamp := timestampFromTime(time.Now())
currentPartitionName := timestampToPartitionName(timestamp) currentPartitionName := timestampToPartitionName(timestamp)
var ptwsToDedup []*partitionWrapper
for _, ptw := range ptws { for _, ptw := range ptws {
if ptw.pt.name == currentPartitionName || !ptw.pt.isFinalDedupNeeded() { if ptw.pt.name == currentPartitionName || !ptw.pt.isFinalDedupNeeded() {
// Do not run final dedup for the current month. // Do not run final dedup for the current month.
continue continue
} }
// mark partition with final deduplication marker
ptw.pt.isDedupScheduled.Store(true)
ptwsToDedup = append(ptwsToDedup, ptw)
}
for _, ptw := range ptwsToDedup {
if err := ptw.pt.runFinalDedup(); err != nil { if err := ptw.pt.runFinalDedup(); err != nil {
logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err) logger.Errorf("cannot run final dedup for partition %s: %s", ptw.pt.name, err)
continue
} }
ptw.pt.isDedupScheduled.Store(false)
} }
} }
d := timeutil.AddJitterToDuration(time.Hour) d := timeutil.AddJitterToDuration(time.Hour)