lib/{mergeset,storage}: reduce the number of fsync calls on data ingestion path on systems with many cpu cores

VictoriaMetrics maintains a buffer per CPU core for the ingested data. These buffers are flushed to disk every second.
These buffers are flushed to disk in parallel starting from the commit 56b6b893ce .
This resulted in increased write disk IO usage on systems with many cpu cores
as described at https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1338#issuecomment-863046999 .

This commit merges the per-CPU buffers into bigger in-memory buffers before flushing them to disk.
This should reduce the rate of fsync syscalls and, consequently, the write disk IO on systems with many CPU cores.

This should help https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1338
See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244
This commit is contained in:
Aliaksandr Valialkin 2021-06-17 13:42:32 +03:00
parent 12a83d25bf
commit aa9b56a046
2 changed files with 75 additions and 109 deletions

View file

@ -177,7 +177,7 @@ func (ris *rawItemsShard) Len() int {
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
var err error
var blocksToMerge []*inmemoryBlock
var blocksToFlush []*inmemoryBlock
ris.mu.Lock()
ibs := ris.ibs
@ -200,19 +200,16 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
}
}
if len(ibs) >= maxBlocksPerShard {
blocksToMerge = ibs
ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
blocksToFlush = append(blocksToFlush, ibs...)
for i := range ibs {
ibs[i] = nil
}
ris.ibs = ibs[:0]
ris.lastFlushTime = fasttime.UnixTimestamp()
}
ris.mu.Unlock()
if blocksToMerge == nil {
// Fast path.
return err
}
// Slow path: merge blocksToMerge.
tb.mergeRawItemsBlocks(blocksToMerge)
tb.mergeRawItemsBlocks(blocksToFlush)
return err
}
@ -586,58 +583,65 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
tb.rawItemsPendingFlushesWG.Add(1)
defer tb.rawItemsPendingFlushesWG.Done()
var wg sync.WaitGroup
wg.Add(len(riss.shards))
var blocksToFlush []*inmemoryBlock
for i := range riss.shards {
go func(ris *rawItemsShard) {
ris.flush(tb, isFinal)
wg.Done()
}(&riss.shards[i])
blocksToFlush = riss.shards[i].appendBlocksToFlush(blocksToFlush, tb, isFinal)
}
wg.Wait()
tb.mergeRawItemsBlocks(blocksToFlush)
}
func (ris *rawItemsShard) flush(tb *Table, isFinal bool) {
mustFlush := false
func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, isFinal bool) []*inmemoryBlock {
currentTime := fasttime.UnixTimestamp()
flushSeconds := int64(rawItemsFlushInterval.Seconds())
if flushSeconds <= 0 {
flushSeconds = 1
}
var blocksToMerge []*inmemoryBlock
ris.mu.Lock()
if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) {
mustFlush = true
blocksToMerge = ris.ibs
ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
ibs := ris.ibs
dst = append(dst, ibs...)
for i := range ibs {
ibs[i] = nil
}
ris.ibs = ibs[:0]
ris.lastFlushTime = currentTime
}
ris.mu.Unlock()
if mustFlush {
tb.mergeRawItemsBlocks(blocksToMerge)
}
return dst
}
func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock) {
if len(ibs) == 0 {
return
}
tb.partMergersWG.Add(1)
defer tb.partMergersWG.Done()
pws := make([]*partWrapper, 0, (len(blocksToMerge)+defaultPartsToMerge-1)/defaultPartsToMerge)
for len(blocksToMerge) > 0 {
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
var pwsLock sync.Mutex
var wg sync.WaitGroup
for len(ibs) > 0 {
n := defaultPartsToMerge
if n > len(blocksToMerge) {
n = len(blocksToMerge)
if n > len(ibs) {
n = len(ibs)
}
pw := tb.mergeInmemoryBlocks(blocksToMerge[:n])
blocksToMerge = blocksToMerge[n:]
if pw == nil {
continue
}
pw.isInMerge = true
pws = append(pws, pw)
wg.Add(1)
go func(ibsPart []*inmemoryBlock) {
defer wg.Done()
pw := tb.mergeInmemoryBlocks(ibsPart)
if pw == nil {
return
}
pw.isInMerge = true
pwsLock.Lock()
pws = append(pws, pw)
pwsLock.Unlock()
}(ibs[:n])
ibs = ibs[n:]
}
wg.Wait()
if len(pws) > 0 {
if err := tb.mergeParts(pws, nil, true); err != nil {
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
@ -672,10 +676,10 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
}
}
func (tb *Table) mergeInmemoryBlocks(blocksToMerge []*inmemoryBlock) *partWrapper {
// Convert blocksToMerge into inmemoryPart's
mps := make([]*inmemoryPart, 0, len(blocksToMerge))
for _, ib := range blocksToMerge {
func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {
// Convert ibs into inmemoryPart's
mps := make([]*inmemoryPart, 0, len(ibs))
for _, ib := range ibs {
if len(ib.items) == 0 {
continue
}

View file

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/bits"
"os"
"path/filepath"
"sort"
@ -478,11 +477,12 @@ func (rrs *rawRowsShard) Len() int {
}
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
var rrss []*rawRows
var rowsToFlush []rawRow
rrs.mu.Lock()
if cap(rrs.rows) == 0 {
rrs.rows = getRawRowsMaxSize().rows
n := getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
}
maxRowsCount := cap(rrs.rows)
for {
@ -494,65 +494,35 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
}
// Slow path - rows don't fit capacity.
// Fill rawRows to capacity and convert it to a part.
rrs.rows = append(rrs.rows, rows[:capacity]...)
rows = rows[capacity:]
rr := getRawRowsMaxSize()
rrs.rows, rr.rows = rr.rows, rrs.rows
rrss = append(rrss, rr)
// Put rrs.rows and rows to rowsToFlush and convert it to a part.
rowsToFlush = append(rowsToFlush, rrs.rows...)
rowsToFlush = append(rowsToFlush, rows...)
rrs.rows = rrs.rows[:0]
rrs.lastFlushTime = fasttime.UnixTimestamp()
}
rrs.mu.Unlock()
for _, rr := range rrss {
pt.addRowsPart(rr.rows)
putRawRows(rr)
}
pt.flushRowsToParts(rowsToFlush)
}
type rawRows struct {
rows []rawRow
}
func getRawRowsMaxSize() *rawRows {
size := getMaxRawRowsPerShard()
return getRawRowsWithSize(size)
}
func getRawRowsWithSize(size int) *rawRows {
p, sizeRounded := getRawRowsPool(size)
v := p.Get()
if v == nil {
return &rawRows{
rows: make([]rawRow, 0, sizeRounded),
func (pt *partition) flushRowsToParts(rows []rawRow) {
maxRows := getMaxRawRowsPerShard()
var wg sync.WaitGroup
for len(rows) > 0 {
n := maxRows
if n > len(rows) {
n = len(rows)
}
wg.Add(1)
go func(rowsPart []rawRow) {
defer wg.Done()
pt.addRowsPart(rowsPart)
}(rows[:n])
rows = rows[n:]
}
return v.(*rawRows)
wg.Wait()
}
func putRawRows(rr *rawRows) {
rr.rows = rr.rows[:0]
size := cap(rr.rows)
p, _ := getRawRowsPool(size)
p.Put(rr)
}
func getRawRowsPool(size int) (*sync.Pool, int) {
size--
if size < 0 {
size = 0
}
bucketIdx := 64 - bits.LeadingZeros64(uint64(size))
if bucketIdx >= len(rawRowsPools) {
bucketIdx = len(rawRowsPools) - 1
}
p := &rawRowsPools[bucketIdx]
sizeRounded := 1 << uint(bucketIdx)
return p, sizeRounded
}
var rawRowsPools [19]sync.Pool
func (pt *partition) addRowsPart(rows []rawRow) {
if len(rows) == 0 {
return
@ -749,19 +719,14 @@ func (pt *partition) flushRawRows(isFinal bool) {
}
func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var wg sync.WaitGroup
wg.Add(len(rrss.shards))
var rowsToFlush []rawRow
for i := range rrss.shards {
go func(rrs *rawRowsShard) {
rrs.flush(pt, isFinal)
wg.Done()
}(&rrss.shards[i])
rowsToFlush = rrss.shards[i].appendRawRowsToFlush(rowsToFlush, pt, isFinal)
}
wg.Wait()
pt.flushRowsToParts(rowsToFlush)
}
func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
var rr *rawRows
func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFinal bool) []rawRow {
currentTime := fasttime.UnixTimestamp()
flushSeconds := int64(rawRowsFlushInterval.Seconds())
if flushSeconds <= 0 {
@ -770,15 +735,12 @@ func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
rrs.mu.Lock()
if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) {
rr = getRawRowsMaxSize()
rrs.rows, rr.rows = rr.rows, rrs.rows
dst = append(dst, rrs.rows...)
rrs.rows = rrs.rows[:0]
}
rrs.mu.Unlock()
if rr != nil {
pt.addRowsPart(rr.rows)
putRawRows(rr)
}
return dst
}
func (pt *partition) startInmemoryPartsFlusher() {