app/vlstorage: export vl_active_merges and vl_merges_total metrics

This commit is contained in:
Aliaksandr Valialkin 2023-06-21 20:58:57 -07:00
parent 352429486a
commit a9eb2409ea
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 46 additions and 1 deletions

View file

@ -108,6 +108,19 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {
return float64(fs.MustGetFreeSpace(*storageDataPath)) return float64(fs.MustGetFreeSpace(*storageDataPath))
}) })
ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 {
return float64(m().InmemoryActiveMerges)
})
ms.NewGauge(`vl_merges_total{type="inmemory"}`, func() float64 {
return float64(m().InmemoryMergesTotal)
})
ms.NewGauge(`vl_active_merges{type="file"}`, func() float64 {
return float64(m().FileActiveMerges)
})
ms.NewGauge(`vl_merges_total{type="file"}`, func() float64 {
return float64(m().FileMergesTotal)
})
ms.NewGauge(`vl_rows{type="inmemory"}`, func() float64 { ms.NewGauge(`vl_rows{type="inmemory"}`, func() float64 {
return float64(m().InmemoryRowsCount) return float64(m().InmemoryRowsCount)
}) })

View file

@ -37,6 +37,11 @@ const maxInmemoryPartsPerPartition = 20
// datadb represents a database with log data // datadb represents a database with log data
type datadb struct { type datadb struct {
inmemoryMergesTotal uint64
inmemoryActiveMerges uint64
fileMergesTotal uint64
fileActiveMerges uint64
// pt is the partition the datadb belongs to // pt is the partition the datadb belongs to
pt *partition pt *partition
@ -330,8 +335,18 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
startTime := time.Now() startTime := time.Now()
// Initialize destination paths.
dstPartType := ddb.getDstPartType(pws, isFinal) dstPartType := ddb.getDstPartType(pws, isFinal)
if dstPartType == partInmemory {
atomic.AddUint64(&ddb.inmemoryMergesTotal, 1)
atomic.AddUint64(&ddb.inmemoryActiveMerges, 1)
defer atomic.AddUint64(&ddb.inmemoryActiveMerges, ^uint64(0))
} else {
atomic.AddUint64(&ddb.fileMergesTotal, 1)
atomic.AddUint64(&ddb.fileActiveMerges, 1)
defer atomic.AddUint64(&ddb.fileActiveMerges, ^uint64(0))
}
// Initialize destination paths.
mergeIdx := ddb.nextMergeIdx() mergeIdx := ddb.nextMergeIdx()
dstPartPath := ddb.getDstPartPath(dstPartType, mergeIdx) dstPartPath := ddb.getDstPartPath(dstPartType, mergeIdx)
@ -509,6 +524,18 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
// DatadbStats contains various stats for datadb. // DatadbStats contains various stats for datadb.
type DatadbStats struct { type DatadbStats struct {
// InmemoryMergesTotal is the number of inmemory merges performed in the given datadb.
InmemoryMergesTotal uint64
// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
InmemoryActiveMerges uint64
// FileMergesTotal is the number of file merges performed in the given datadb.
FileMergesTotal uint64
// FileActiveMerges is the number of currently active file merges performed by the given datadb.
FileActiveMerges uint64
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet. // InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
InmemoryRowsCount uint64 InmemoryRowsCount uint64
@ -551,6 +578,11 @@ func (s *DatadbStats) RowsCount() uint64 {
// updateStats updates s with ddb stats // updateStats updates s with ddb stats
func (ddb *datadb) updateStats(s *DatadbStats) { func (ddb *datadb) updateStats(s *DatadbStats) {
s.InmemoryMergesTotal += atomic.LoadUint64(&ddb.inmemoryMergesTotal)
s.InmemoryActiveMerges += atomic.LoadUint64(&ddb.inmemoryActiveMerges)
s.FileMergesTotal += atomic.LoadUint64(&ddb.fileMergesTotal)
s.FileActiveMerges += atomic.LoadUint64(&ddb.fileActiveMerges)
ddb.partsLock.Lock() ddb.partsLock.Lock()
s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts) s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts)