diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 6f9f9ef48..8dab7ce1c 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -108,6 +108,19 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { return float64(fs.MustGetFreeSpace(*storageDataPath)) }) + ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 { + return float64(m().InmemoryActiveMerges) + }) + ms.NewGauge(`vl_merges_total{type="inmemory"}`, func() float64 { + return float64(m().InmemoryMergesTotal) + }) + ms.NewGauge(`vl_active_merges{type="file"}`, func() float64 { + return float64(m().FileActiveMerges) + }) + ms.NewGauge(`vl_merges_total{type="file"}`, func() float64 { + return float64(m().FileMergesTotal) + }) + ms.NewGauge(`vl_rows{type="inmemory"}`, func() float64 { return float64(m().InmemoryRowsCount) }) diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index e605e2f24..9bf9ce26f 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -37,6 +37,11 @@ const maxInmemoryPartsPerPartition = 20 // datadb represents a database with log data type datadb struct { + inmemoryMergesTotal uint64 + inmemoryActiveMerges uint64 + fileMergesTotal uint64 + fileActiveMerges uint64 + // pt is the partition the datadb belongs to pt *partition @@ -330,8 +335,18 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { startTime := time.Now() - // Initialize destination paths. dstPartType := ddb.getDstPartType(pws, isFinal) + if dstPartType == partInmemory { + atomic.AddUint64(&ddb.inmemoryMergesTotal, 1) + atomic.AddUint64(&ddb.inmemoryActiveMerges, 1) + defer atomic.AddUint64(&ddb.inmemoryActiveMerges, ^uint64(0)) + } else { + atomic.AddUint64(&ddb.fileMergesTotal, 1) + atomic.AddUint64(&ddb.fileActiveMerges, 1) + defer atomic.AddUint64(&ddb.fileActiveMerges, ^uint64(0)) + } + + // Initialize destination paths. mergeIdx := ddb.nextMergeIdx() dstPartPath := ddb.getDstPartPath(dstPartType, mergeIdx) @@ -509,6 +524,18 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { // DatadbStats contains various stats for datadb. type DatadbStats struct { + // InmemoryMergesTotal is the number of inmemory merges performed in the given datadb. + InmemoryMergesTotal uint64 + + // InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb. + InmemoryActiveMerges uint64 + + // FileMergesTotal is the number of file merges performed in the given datadb. + FileMergesTotal uint64 + + // FileActiveMerges is the number of currently active file merges performed by the given datadb. + FileActiveMerges uint64 + // InmemoryRowsCount is the number of rows, which weren't flushed to disk yet. InmemoryRowsCount uint64 @@ -551,6 +578,11 @@ func (s *DatadbStats) RowsCount() uint64 { // updateStats updates s with ddb stats func (ddb *datadb) updateStats(s *DatadbStats) { + s.InmemoryMergesTotal += atomic.LoadUint64(&ddb.inmemoryMergesTotal) + s.InmemoryActiveMerges += atomic.LoadUint64(&ddb.inmemoryActiveMerges) + s.FileMergesTotal += atomic.LoadUint64(&ddb.fileMergesTotal) + s.FileActiveMerges += atomic.LoadUint64(&ddb.fileActiveMerges) + ddb.partsLock.Lock() s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts)