mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
app/vmstorage: add metrics for determining whether background merges need additional disk space to complete
These metrics are: * vm_small_merge_need_free_disk_space * vm_big_merge_need_free_disk_space Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
This commit is contained in:
parent
dbf9402329
commit
a9db81c4ab
3 changed files with 48 additions and 13 deletions
|
@ -398,6 +398,14 @@ func registerStorageMetrics() {
|
||||||
return float64(idbm().AssistedMerges)
|
return float64(idbm().AssistedMerges)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
|
||||||
|
metrics.NewGauge(`vm_small_merge_need_free_disk_space`, func() float64 {
|
||||||
|
return float64(tm().SmallMergeNeedFreeDiskSpace)
|
||||||
|
})
|
||||||
|
metrics.NewGauge(`vm_big_merge_need_free_disk_space`, func() float64 {
|
||||||
|
return float64(tm().BigMergeNeedFreeDiskSpace)
|
||||||
|
})
|
||||||
|
|
||||||
metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 {
|
metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 {
|
||||||
return float64(tm().PendingRows)
|
return float64(tm().PendingRows)
|
||||||
})
|
})
|
||||||
|
|
|
@ -330,6 +330,9 @@ type partitionMetrics struct {
|
||||||
SmallPartsRefCount uint64
|
SmallPartsRefCount uint64
|
||||||
|
|
||||||
SmallAssistedMerges uint64
|
SmallAssistedMerges uint64
|
||||||
|
|
||||||
|
SmallMergeNeedFreeDiskSpace uint64
|
||||||
|
BigMergeNeedFreeDiskSpace uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateMetrics updates m with metrics from pt.
|
// UpdateMetrics updates m with metrics from pt.
|
||||||
|
@ -388,6 +391,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) {
|
||||||
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
|
m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted)
|
||||||
|
|
||||||
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
|
m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges)
|
||||||
|
|
||||||
|
m.SmallMergeNeedFreeDiskSpace = atomic.LoadUint64(&smallMergeNeedFreeDiskSpace)
|
||||||
|
m.BigMergeNeedFreeDiskSpace = atomic.LoadUint64(&bigMergeNeedFreeDiskSpace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddRows adds the given rows to the partition pt.
|
// AddRows adds the given rows to the partition pt.
|
||||||
|
@ -990,9 +996,10 @@ func (pt *partition) mergeBigParts(isFinal bool) error {
|
||||||
maxRows := maxRowsByPath(pt.bigPartsPath)
|
maxRows := maxRowsByPath(pt.bigPartsPath)
|
||||||
|
|
||||||
pt.partsLock.Lock()
|
pt.partsLock.Lock()
|
||||||
pws := getPartsToMerge(pt.bigParts, maxRows, isFinal)
|
pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal)
|
||||||
pt.partsLock.Unlock()
|
pt.partsLock.Unlock()
|
||||||
|
|
||||||
|
atomicSetBool(&bigMergeNeedFreeDiskSpace, needFreeSpace)
|
||||||
return pt.mergeParts(pws, pt.stopCh)
|
return pt.mergeParts(pws, pt.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1008,14 +1015,28 @@ func (pt *partition) mergeSmallParts(isFinal bool) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
pt.partsLock.Lock()
|
pt.partsLock.Lock()
|
||||||
pws := getPartsToMerge(pt.smallParts, maxRows, isFinal)
|
pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal)
|
||||||
pt.partsLock.Unlock()
|
pt.partsLock.Unlock()
|
||||||
|
|
||||||
|
atomicSetBool(&smallMergeNeedFreeDiskSpace, needFreeSpace)
|
||||||
return pt.mergeParts(pws, pt.stopCh)
|
return pt.mergeParts(pws, pt.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
var errNothingToMerge = fmt.Errorf("nothing to merge")
|
||||||
|
|
||||||
|
func atomicSetBool(p *uint64, b bool) {
|
||||||
|
v := uint64(0)
|
||||||
|
if b {
|
||||||
|
v = 1
|
||||||
|
}
|
||||||
|
atomic.StoreUint64(p, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
smallMergeNeedFreeDiskSpace uint64
|
||||||
|
bigMergeNeedFreeDiskSpace uint64
|
||||||
|
)
|
||||||
|
|
||||||
// mergeParts merges pws.
|
// mergeParts merges pws.
|
||||||
//
|
//
|
||||||
// Merging is immediately stopped if stopCh is closed.
|
// Merging is immediately stopped if stopCh is closed.
|
||||||
|
@ -1242,7 +1263,8 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
|
||||||
// getPartsToMerge returns optimal parts to merge from pws.
|
// getPartsToMerge returns optimal parts to merge from pws.
|
||||||
//
|
//
|
||||||
// The returned rows will contain less than maxRows rows.
|
// The returned rows will contain less than maxRows rows.
|
||||||
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWrapper {
|
// The function returns true if pws contains parts, which cannot be merged because of maxRows limit.
|
||||||
|
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) {
|
||||||
pwsRemaining := make([]*partWrapper, 0, len(pws))
|
pwsRemaining := make([]*partWrapper, 0, len(pws))
|
||||||
for _, pw := range pws {
|
for _, pw := range pws {
|
||||||
if !pw.isInMerge {
|
if !pw.isInMerge {
|
||||||
|
@ -1251,13 +1273,14 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr
|
||||||
}
|
}
|
||||||
maxPartsToMerge := defaultPartsToMerge
|
maxPartsToMerge := defaultPartsToMerge
|
||||||
var pms []*partWrapper
|
var pms []*partWrapper
|
||||||
|
needFreeSpace := false
|
||||||
if isFinal {
|
if isFinal {
|
||||||
for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge {
|
for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge {
|
||||||
pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
||||||
maxPartsToMerge--
|
maxPartsToMerge--
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows)
|
||||||
}
|
}
|
||||||
for _, pw := range pms {
|
for _, pw := range pms {
|
||||||
if pw.isInMerge {
|
if pw.isInMerge {
|
||||||
|
@ -1265,15 +1288,16 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr
|
||||||
}
|
}
|
||||||
pw.isInMerge = true
|
pw.isInMerge = true
|
||||||
}
|
}
|
||||||
return pms
|
return pms, needFreeSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
// appendPartsToMerge finds optimal parts to merge from src, appends
|
// appendPartsToMerge finds optimal parts to merge from src, appends
|
||||||
// them to dst and returns the result.
|
// them to dst and returns the result.
|
||||||
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) []*partWrapper {
|
// The function returns true if src contains parts, which cannot be merged because of maxRows limit.
|
||||||
|
func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) {
|
||||||
if len(src) < 2 {
|
if len(src) < 2 {
|
||||||
// There is no need in merging zero or one part :)
|
// There is no need in merging zero or one part :)
|
||||||
return dst
|
return dst, false
|
||||||
}
|
}
|
||||||
if maxPartsToMerge < 2 {
|
if maxPartsToMerge < 2 {
|
||||||
logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge)
|
logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge)
|
||||||
|
@ -1281,10 +1305,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
|
|
||||||
// Filter out too big parts.
|
// Filter out too big parts.
|
||||||
// This should reduce N for O(N^2) algorithm below.
|
// This should reduce N for O(N^2) algorithm below.
|
||||||
|
needFreeSpace := false
|
||||||
maxInPartRows := maxRows / 2
|
maxInPartRows := maxRows / 2
|
||||||
tmp := make([]*partWrapper, 0, len(src))
|
tmp := make([]*partWrapper, 0, len(src))
|
||||||
for _, pw := range src {
|
for _, pw := range src {
|
||||||
if pw.p.ph.RowsCount > maxInPartRows {
|
if pw.p.ph.RowsCount > maxInPartRows {
|
||||||
|
needFreeSpace = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tmp = append(tmp, pw)
|
tmp = append(tmp, pw)
|
||||||
|
@ -1320,6 +1346,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
}
|
}
|
||||||
if rowsSum > maxRows {
|
if rowsSum > maxRows {
|
||||||
// There is no need in verifying remaining parts with higher number of rows
|
// There is no need in verifying remaining parts with higher number of rows
|
||||||
|
needFreeSpace = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount)
|
m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount)
|
||||||
|
@ -1337,9 +1364,9 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
}
|
}
|
||||||
if maxM < minM {
|
if maxM < minM {
|
||||||
// There is no sense in merging parts with too small m.
|
// There is no sense in merging parts with too small m.
|
||||||
return dst
|
return dst, needFreeSpace
|
||||||
}
|
}
|
||||||
return append(dst, pws...)
|
return append(dst, pws...), needFreeSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) {
|
||||||
|
|
|
@ -49,7 +49,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
|
||||||
iterationsCount := 0
|
iterationsCount := 0
|
||||||
rowsMerged := uint64(0)
|
rowsMerged := uint64(0)
|
||||||
for {
|
for {
|
||||||
pms := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
|
pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows)
|
||||||
if len(pms) == 0 {
|
if len(pms) == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
|
||||||
pws := newTestPartWrappersForRowsCount(initialRowsCount)
|
pws := newTestPartWrappersForRowsCount(initialRowsCount)
|
||||||
|
|
||||||
// Verify appending to nil.
|
// Verify appending to nil.
|
||||||
pms := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
|
pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9)
|
||||||
rowsCount := newTestRowsCountFromPartWrappers(pms)
|
rowsCount := newTestRowsCountFromPartWrappers(pms)
|
||||||
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
if !reflect.DeepEqual(rowsCount, expectedRowsCount) {
|
||||||
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d",
|
||||||
|
@ -118,7 +118,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
|
||||||
{},
|
{},
|
||||||
{},
|
{},
|
||||||
}
|
}
|
||||||
pms = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
|
pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9)
|
||||||
if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
|
if !reflect.DeepEqual(pms[:len(prefix)], prefix) {
|
||||||
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
|
t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v",
|
||||||
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)
|
maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)
|
||||||
|
|
Loading…
Reference in a new issue