lib/{storage,mergeset}: tune background merge process in order to reduce CPU usage and disk IO usage

This commit is contained in:
Aliaksandr Valialkin 2020-12-18 20:00:06 +02:00
parent e6666da4e7
commit ebf8da3730
4 changed files with 51 additions and 19 deletions

View file

@ -2,6 +2,7 @@
# tip
* FEATURE: reduce CPU usage and disk IO usage during background merge process.
* FEATURE: do not publish duplicate Docker images with `-cluster` tag suffix for [vmagent](https://victoriametrics.github.io/vmagent.html), [vmalert](https://victoriametrics.github.io/vmalert.html), [vmauth](https://victoriametrics.github.io/vmauth.html), [vmbackup](https://victoriametrics.github.io/vmbackup.html) and [vmrestore](https://victoriametrics.github.io/vmrestore.html), since they are identical to images without `-cluster` tag suffix.
* BUGFIX: properly parse lines in [Prometheus exposition format](https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md) and in [OpenMetrics format](https://github.com/OpenObservability/OpenMetrics/blob/master/specification/OpenMetrics.md) with whitespace after the timestamp. For example, `foo 123 456 # some comment here`. See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/970

View file

@ -1305,22 +1305,36 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxItems u
// Sort src parts by itemsCount.
sort.Slice(src, func(i, j int) bool { return src[i].p.ph.itemsCount < src[j].p.ph.itemsCount })
n := maxPartsToMerge
if len(src) < n {
n = len(src)
minSrcParts := (maxPartsToMerge + 1) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
maxSrcParts := maxPartsToMerge
if len(src) < maxSrcParts {
maxSrcParts = len(src)
}
// Exhaustive search for parts giving the lowest write amplification
// when merged.
// Exhaustive search for parts giving the lowest write amplification when merged.
var pws []*partWrapper
maxM := float64(0)
for i := 2; i <= n; i++ {
for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ {
itemsSum := uint64(0)
a := src[j : j+i]
if a[0].p.ph.itemsCount*uint64(len(a)) < a[len(a)-1].p.ph.itemsCount {
// Do not merge parts with too big difference in items count,
// since this results in unbalanced merges.
continue
}
itemsSum := uint64(0)
for _, pw := range a {
itemsSum += pw.p.ph.itemsCount
}
if itemsSum < 1e6 && len(a) < maxPartsToMerge {
// Do not merge parts with too small number of items if the number of source parts
// isn't equal to maxPartsToMerge. This should reduce CPU usage and disk IO usage
// for small parts merge.
continue
}
if itemsSum > maxItems {
// There is no sense in checking the remaining bigger parts.
break

View file

@ -1279,7 +1279,7 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig
// getPartsToMerge returns optimal parts to merge from pws.
//
// The returned rows will contain less than maxRows rows.
// The returned parts will contain less than maxRows rows.
// The function returns true if pws contains parts, which cannot be merged because of maxRows limit.
func getPartsToMerge(pws []*partWrapper, maxRows uint64, isFinal bool) ([]*partWrapper, bool) {
pwsRemaining := make([]*partWrapper, 0, len(pws))
@ -1345,22 +1345,36 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
return a.RowsCount < b.RowsCount
})
n := maxPartsToMerge
if len(src) < n {
n = len(src)
minSrcParts := (maxPartsToMerge + 1) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
maxSrcParts := maxPartsToMerge
if len(src) < maxSrcParts {
maxSrcParts = len(src)
}
// Exhaustive search for parts giving the lowest write amplification
// when merged.
// Exhaustive search for parts giving the lowest write amplification when merged.
var pws []*partWrapper
maxM := float64(0)
for i := 2; i <= n; i++ {
for i := minSrcParts; i <= maxSrcParts; i++ {
for j := 0; j <= len(src)-i; j++ {
a := src[j : j+i]
if a[0].p.ph.RowsCount*uint64(len(a)) < a[len(a)-1].p.ph.RowsCount {
// Do not merge parts with too big difference in rows count,
// since this results in unbalanced merges.
continue
}
rowsSum := uint64(0)
for _, pw := range a {
rowsSum += pw.p.ph.RowsCount
}
if rowsSum < 1e6 && len(a) < maxPartsToMerge {
// Do not merge parts with too small number of rows if the number of source parts
// isn't equal to maxPartsToMerge. This should reduce CPU usage and disk IO usage
// for small parts merge.
continue
}
if rowsSum > maxRows {
// There is no need in verifying remaining parts with higher number of rows
needFreeSpace = true

View file

@ -18,7 +18,7 @@ func TestAppendPartsToMerge(t *testing.T) {
testAppendPartsToMerge(t, 2, []uint64{123}, nil)
testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil)
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil)
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 2, 1}, []uint64{2, 7, 9, 10})
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 3, 1}, []uint64{3, 7, 9, 10})
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2})
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8})
testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1})
@ -26,7 +26,9 @@ func TestAppendPartsToMerge(t *testing.T) {
testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4})
testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 4, []uint64{1e6, 3e6, 7e6, 2e6}, []uint64{1e6, 2e6, 3e6})
testAppendPartsToMerge(t, 4, []uint64{2, 3, 7, 2}, []uint64{2, 2, 3, 7})
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
}
@ -35,8 +37,9 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
// using minimum merges.
var a []uint64
maxOutPartRows := uint64(0)
r := rand.New(rand.NewSource(1))
for i := 0; i < 1024; i++ {
n := uint64(uint32(rand.NormFloat64() * 1e9))
n := uint64(uint32(r.NormFloat64() * 1e9))
if n < 0 {
n = -n
}
@ -83,11 +86,11 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
rowsTotal += uint64(rc)
}
overhead := float64(rowsMerged) / float64(rowsTotal)
if overhead > 2.96 {
if overhead > 2.1 {
t.Fatalf("too big overhead; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
}
if len(rowsCount) > 40 {
if len(rowsCount) > 18 {
t.Fatalf("too many rowsCount %d; rowsCount=%d, iterationsCount=%d, rowsTotal=%d, rowsMerged=%d, overhead=%f",
len(rowsCount), rowsCount, iterationsCount, rowsTotal, rowsMerged, overhead)
}