mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: properly detect free disk space shortage during data merge
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1373
This commit is contained in:
parent
44855f0c9b
commit
8055439fe4
3 changed files with 32 additions and 13 deletions
|
@ -1392,14 +1392,14 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
|
||||||
// Sort src parts by itemsCount.
|
// Sort src parts by itemsCount.
|
||||||
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount })
|
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount })
|
||||||
|
|
||||||
minSrcParts := (maxPartsToMerge + 1) / 2
|
maxSrcParts := maxPartsToMerge
|
||||||
|
if maxSrcParts > len(src) {
|
||||||
|
maxSrcParts = len(src)
|
||||||
|
}
|
||||||
|
minSrcParts := (maxSrcParts + 1) / 2
|
||||||
if minSrcParts < 2 {
|
if minSrcParts < 2 {
|
||||||
minSrcParts = 2
|
minSrcParts = 2
|
||||||
}
|
}
|
||||||
maxSrcParts := maxPartsToMerge
|
|
||||||
if len(src) < maxSrcParts {
|
|
||||||
maxSrcParts = len(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exhaustive search for parts giving the lowest write amplification when merged.
|
// Exhaustive search for parts giving the lowest write amplification when merged.
|
||||||
var pws []*partWrapper
|
var pws []*partWrapper
|
||||||
|
|
|
@ -1386,17 +1386,18 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
|
|
||||||
// Filter out too big parts.
|
// Filter out too big parts.
|
||||||
// This should reduce N for O(N^2) algorithm below.
|
// This should reduce N for O(N^2) algorithm below.
|
||||||
needFreeSpace := false
|
skippedBigParts := 0
|
||||||
maxInPartRows := maxRows / 2
|
maxInPartRows := maxRows / 2
|
||||||
tmp := make([]*partWrapper, 0, len(src))
|
tmp := make([]*partWrapper, 0, len(src))
|
||||||
for _, pw := range src {
|
for _, pw := range src {
|
||||||
if pw.p.ph.RowsCount > maxInPartRows {
|
if pw.p.ph.RowsCount > maxInPartRows {
|
||||||
needFreeSpace = true
|
skippedBigParts++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
tmp = append(tmp, pw)
|
tmp = append(tmp, pw)
|
||||||
}
|
}
|
||||||
src = tmp
|
src = tmp
|
||||||
|
needFreeSpace := skippedBigParts > 1
|
||||||
|
|
||||||
// Sort src parts by rows count and backwards timestamp.
|
// Sort src parts by rows count and backwards timestamp.
|
||||||
// This should improve adjanced points' locality in the merged parts.
|
// This should improve adjanced points' locality in the merged parts.
|
||||||
|
@ -1409,14 +1410,14 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
return a.RowsCount < b.RowsCount
|
return a.RowsCount < b.RowsCount
|
||||||
})
|
})
|
||||||
|
|
||||||
minSrcParts := (maxPartsToMerge + 1) / 2
|
maxSrcParts := maxPartsToMerge
|
||||||
|
if maxSrcParts > len(src) {
|
||||||
|
maxSrcParts = len(src)
|
||||||
|
}
|
||||||
|
minSrcParts := (maxSrcParts + 1) / 2
|
||||||
if minSrcParts < 2 {
|
if minSrcParts < 2 {
|
||||||
minSrcParts = 2
|
minSrcParts = 2
|
||||||
}
|
}
|
||||||
maxSrcParts := maxPartsToMerge
|
|
||||||
if len(src) < maxSrcParts {
|
|
||||||
maxSrcParts = len(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exhaustive search for parts giving the lowest write amplification when merged.
|
// Exhaustive search for parts giving the lowest write amplification when merged.
|
||||||
var pws []*partWrapper
|
var pws []*partWrapper
|
||||||
|
@ -1450,7 +1451,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
|
||||||
}
|
}
|
||||||
if maxM < minM {
|
if maxM < minM {
|
||||||
// There is no sense in merging parts with too small m.
|
// There is no sense in merging parts with too small m.
|
||||||
return dst, false
|
return dst, needFreeSpace
|
||||||
}
|
}
|
||||||
return append(dst, pws...), needFreeSpace
|
return append(dst, pws...), needFreeSpace
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,24 @@ func TestAppendPartsToMerge(t *testing.T) {
|
||||||
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
|
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAppendPartsToMergeNeedFreeSpace(t *testing.T) {
|
||||||
|
f := func(a []uint64, maxItems int, expectedNeedFreeSpace bool) {
|
||||||
|
t.Helper()
|
||||||
|
pws := newTestPartWrappersForRowsCount(a)
|
||||||
|
_, needFreeSpace := appendPartsToMerge(nil, pws, defaultPartsToMerge, uint64(maxItems))
|
||||||
|
if needFreeSpace != expectedNeedFreeSpace {
|
||||||
|
t.Fatalf("unexpected needFreeSpace; got %v; want %v", needFreeSpace, expectedNeedFreeSpace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f(nil, 1000, false)
|
||||||
|
f([]uint64{1000}, 100, false)
|
||||||
|
f([]uint64{1000}, 1100, false)
|
||||||
|
f([]uint64{100, 200}, 180, true)
|
||||||
|
f([]uint64{100, 200}, 310, false)
|
||||||
|
f([]uint64{100, 110, 109, 1}, 300, true)
|
||||||
|
f([]uint64{100, 110, 109, 1}, 330, false)
|
||||||
|
}
|
||||||
|
|
||||||
func TestAppendPartsToMergeManyParts(t *testing.T) {
|
func TestAppendPartsToMergeManyParts(t *testing.T) {
|
||||||
// Verify that big number of parts are merged into minimal number of parts
|
// Verify that big number of parts are merged into minimal number of parts
|
||||||
// using minimum merges.
|
// using minimum merges.
|
||||||
|
|
Loading…
Reference in a new issue