diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 2725f9e0d..8244c8a61 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -398,6 +398,14 @@ func registerStorageMetrics() { return float64(idbm().AssistedMerges) }) + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686 + metrics.NewGauge(`vm_small_merge_need_free_disk_space`, func() float64 { + return float64(tm().SmallMergeNeedFreeDiskSpace) + }) + metrics.NewGauge(`vm_big_merge_need_free_disk_space`, func() float64 { + return float64(tm().BigMergeNeedFreeDiskSpace) + }) + metrics.NewGauge(`vm_pending_rows{type="storage"}`, func() float64 { return float64(tm().PendingRows) }) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 9bc347a8b..73d7aba48 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -330,6 +330,9 @@ type partitionMetrics struct { SmallPartsRefCount uint64 SmallAssistedMerges uint64 + + SmallMergeNeedFreeDiskSpace uint64 + BigMergeNeedFreeDiskSpace uint64 } // UpdateMetrics updates m with metrics from pt. @@ -388,6 +391,9 @@ func (pt *partition) UpdateMetrics(m *partitionMetrics) { m.SmallRowsDeleted += atomic.LoadUint64(&pt.smallRowsDeleted) m.SmallAssistedMerges += atomic.LoadUint64(&pt.smallAssistedMerges) + + m.SmallMergeNeedFreeDiskSpace = atomic.LoadUint64(&smallMergeNeedFreeDiskSpace) + m.BigMergeNeedFreeDiskSpace = atomic.LoadUint64(&bigMergeNeedFreeDiskSpace) } // AddRows adds the given rows to the partition pt. @@ -990,9 +996,10 @@ func (pt *partition) mergeBigParts(isFinal bool) error { maxRows := maxRowsByPath(pt.bigPartsPath) pt.partsLock.Lock() - pws := getPartsToMerge(pt.bigParts, maxRows, isFinal) + pws, needFreeSpace := getPartsToMerge(pt.bigParts, maxRows, isFinal) pt.partsLock.Unlock() + atomicSetBool(&bigMergeNeedFreeDiskSpace, needFreeSpace) return pt.mergeParts(pws, pt.stopCh) } @@ -1008,14 +1015,28 @@ func (pt *partition) mergeSmallParts(isFinal bool) error { } pt.partsLock.Lock() - pws := getPartsToMerge(pt.smallParts, maxRows, isFinal) + pws, needFreeSpace := getPartsToMerge(pt.smallParts, maxRows, isFinal) pt.partsLock.Unlock() + atomicSetBool(&smallMergeNeedFreeDiskSpace, needFreeSpace) return pt.mergeParts(pws, pt.stopCh) } var errNothingToMerge = fmt.Errorf("nothing to merge") +func atomicSetBool(p *uint64, b bool) { + v := uint64(0) + if b { + v = 1 + } + atomic.StoreUint64(p, v) +} + +var ( + smallMergeNeedFreeDiskSpace uint64 + bigMergeNeedFreeDiskSpace uint64 +) + // mergeParts merges pws. // // Merging is immediately stopped if stopCh is closed. @@ -1242,7 +1263,8 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig // getPartsToMerge returns optimal parts to merge from pws. // // The returned rows will contain less than maxRows rows. -func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWrapper { +// The function returns true if pws contains parts, which cannot be merged because of maxRows limit. +func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) { pwsRemaining := make([]*partWrapper, 0, len(pws)) for _, pw := range pws { if !pw.isInMerge { @@ -1251,13 +1273,14 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr } maxPartsToMerge := defaultPartsToMerge var pms []*partWrapper + needFreeSpace := false if isFinal { for len(pms) == 0 && maxPartsToMerge >= finalPartsToMerge { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) + pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) maxPartsToMerge-- } } else { - pms = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) + pms, needFreeSpace = appendPartsToMerge(pms[:0], pwsRemaining, maxPartsToMerge, maxRows) } for _, pw := range pms { if pw.isInMerge { @@ -1265,15 +1288,16 @@ func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) []*partWr } pw.isInMerge = true } - return pms + return pms, needFreeSpace } // appendPartsToMerge finds optimal parts to merge from src, appends // them to dst and returns the result. -func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) []*partWrapper { +// The function returns true if src contains parts, which cannot be merged because of maxRows limit. +func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows uint64) ([]*partWrapper, bool) { if len(src) < 2 { // There is no need in merging zero or one part :) - return dst + return dst, false } if maxPartsToMerge < 2 { logger.Panicf("BUG: maxPartsToMerge cannot be smaller than 2; got %d", maxPartsToMerge) @@ -1281,10 +1305,12 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui // Filter out too big parts. // This should reduce N for O(N^2) algorithm below. + needFreeSpace := false maxInPartRows := maxRows / 2 tmp := make([]*partWrapper, 0, len(src)) for _, pw := range src { if pw.p.ph.RowsCount > maxInPartRows { + needFreeSpace = true continue } tmp = append(tmp, pw) @@ -1320,6 +1346,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui } if rowsSum > maxRows { // There is no need in verifying remaining parts with higher number of rows + needFreeSpace = true break } m := float64(rowsSum) / float64(a[len(a)-1].p.ph.RowsCount) @@ -1337,9 +1364,9 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui } if maxM < minM { // There is no sense in merging parts with too small m. - return dst + return dst, needFreeSpace } - return append(dst, pws...) + return append(dst, pws...), needFreeSpace } func openParts(pathPrefix1, pathPrefix2, path string) ([]*partWrapper, error) { diff --git a/lib/storage/partition_test.go b/lib/storage/partition_test.go index 457d9b9d0..bfe5e3da4 100644 --- a/lib/storage/partition_test.go +++ b/lib/storage/partition_test.go @@ -49,7 +49,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) { iterationsCount := 0 rowsMerged := uint64(0) for { - pms := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows) + pms, _ := appendPartsToMerge(nil, pws, defaultPartsToMerge, maxOutPartRows) if len(pms) == 0 { break } @@ -99,7 +99,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, pws := newTestPartWrappersForRowsCount(initialRowsCount) // Verify appending to nil. - pms := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9) + pms, _ := appendPartsToMerge(nil, pws, maxPartsToMerge, 1e9) rowsCount := newTestRowsCountFromPartWrappers(pms) if !reflect.DeepEqual(rowsCount, expectedRowsCount) { t.Fatalf("unexpected rowsCount for maxPartsToMerge=%d, initialRowsCount=%d; got\n%d; want\n%d", @@ -118,7 +118,7 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, {}, {}, } - pms = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9) + pms, _ = appendPartsToMerge(prefix, pws, maxPartsToMerge, 1e9) if !reflect.DeepEqual(pms[:len(prefix)], prefix) { t.Fatalf("unexpected prefix for maxPartsToMerge=%d, initialRowsCount=%d; got\n%+v; want\n%+v", maxPartsToMerge, initialRowsCount, pms[:len(prefix)], prefix)