lib/mergeset: split rows ingestion among multiple shards

This improves rows ingestion on systems with many CPU cores by reducing lock contention.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244

Thanks to @waldoweng for the original idea and draft implementation at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243
This commit is contained in:
Aliaksandr Valialkin 2021-04-27 15:36:31 +03:00
parent 2ec7d8b384
commit 56b6b893ce
3 changed files with 150 additions and 75 deletions

View file

@ -9,6 +9,7 @@ sort: 15
* FEATURE: vmauth: add ability to set madatory query args in `url_prefix`. For example, `url_prefix: http://vm:8428/?extra_label=team=dev` would add `extra_label=team=dev` query arg to all the incoming requests. See [the example](https://docs.victoriametrics.com/vmauth.html#auth-config) for more details.
* FEATURE: add OpenTSDB migration option to vmctl. See more details [here](https://docs.victoriametrics.com/vmctl#migrating-data-from-opentsdb).
Thanks to @johnseekins!
* FEATURE: improved new time series registration speed on systems with many CPU cores. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1244). Thanks to @waldoweng for the idea and [draft implementation](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/1243).
* BUGFIX: vmagent: properly update `role: endpoints` and `role: endpointslices` scrape targets if the underlying service changes in `kubernetes_sd_config`. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240).
* BUGFIX: vmagent: apply `scrape_timeout` on receiving the first response byte from `stream_parse: true` scrape targets. Previously it was applied to receiving and *processing* the full response stream. This could result in false timeout errors when scrape target exposes millions of metrics as described [here](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1017#issuecomment-767235047).

View file

@ -103,9 +103,10 @@ type Table struct {
partsLock sync.Mutex
parts []*partWrapper
rawItemsBlocks []*inmemoryBlock
rawItemsLock sync.Mutex
rawItemsLastFlushTime uint64
// rawItems contains recently added items that haven't been converted to parts yet.
//
// rawItems aren't used in search for performance reasons
rawItems rawItemsShards
snapshotLock sync.RWMutex
@ -124,6 +125,97 @@ type Table struct {
rawItemsPendingFlushesWG syncwg.WaitGroup
}
type rawItemsShards struct {
shardIdx uint64
// shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawItemsShard
}
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawItemsShardsPerTable = cgroup.AvailableCPUs()
const maxBlocksPerShard = 512
func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
}
func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {
n := atomic.AddUint64(&riss.shardIdx, 1)
shards := riss.shards
idx := n % uint64(len(shards))
shard := &shards[idx]
return shard.addItems(tb, items)
}
func (riss *rawItemsShards) Len() int {
n := 0
for i := range riss.shards {
n += riss.shards[i].Len()
}
return n
}
type rawItemsShard struct {
mu sync.Mutex
ibs []*inmemoryBlock
lastFlushTime uint64
}
func (ris *rawItemsShard) Len() int {
ris.mu.Lock()
n := 0
for _, ib := range ris.ibs {
n += len(ib.items)
}
ris.mu.Unlock()
return n
}
func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {
var err error
var blocksToMerge []*inmemoryBlock
ris.mu.Lock()
ibs := ris.ibs
if len(ibs) == 0 {
ib := getInmemoryBlock()
ibs = append(ibs, ib)
ris.ibs = ibs
}
ib := ibs[len(ibs)-1]
for _, item := range items {
if !ib.Add(item) {
ib = getInmemoryBlock()
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break
}
ibs = append(ibs, ib)
ris.ibs = ibs
}
}
if len(ibs) >= maxBlocksPerShard {
blocksToMerge = ibs
ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
ris.lastFlushTime = fasttime.UnixTimestamp()
}
ris.mu.Unlock()
if blocksToMerge == nil {
// Fast path.
return err
}
// Slow path: merge blocksToMerge.
tb.mergeRawItemsBlocks(blocksToMerge)
return err
}
type partWrapper struct {
p *part
@ -195,6 +287,7 @@ func OpenTable(path string, flushCallback func(), prepareBlock PrepareBlockCallb
flockF: flockF,
stopCh: make(chan struct{}),
}
tb.rawItems.init()
tb.startPartMergers()
tb.startRawItemsFlusher()
@ -314,11 +407,7 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
m.ItemsMerged += atomic.LoadUint64(&tb.itemsMerged)
m.AssistedMerges += atomic.LoadUint64(&tb.assistedMerges)
tb.rawItemsLock.Lock()
for _, ib := range tb.rawItemsBlocks {
m.PendingItems += uint64(len(ib.items))
}
tb.rawItemsLock.Unlock()
m.PendingItems += uint64(tb.rawItems.Len())
tb.partsLock.Lock()
m.PartsCount += uint64(len(tb.parts))
@ -352,42 +441,10 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
// AddItems adds the given items to the tb.
func (tb *Table) AddItems(items [][]byte) error {
var err error
var blocksToMerge []*inmemoryBlock
tb.rawItemsLock.Lock()
if len(tb.rawItemsBlocks) == 0 {
ib := getInmemoryBlock()
tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib)
if err := tb.rawItems.addItems(tb, items); err != nil {
return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)
}
ib := tb.rawItemsBlocks[len(tb.rawItemsBlocks)-1]
for _, item := range items {
if !ib.Add(item) {
ib = getInmemoryBlock()
if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock on %q; it looks like the item is too large? len(item)=%d",
item, tb.path, len(item))
break
}
tb.rawItemsBlocks = append(tb.rawItemsBlocks, ib)
}
}
if len(tb.rawItemsBlocks) >= 512 {
blocksToMerge = tb.rawItemsBlocks
tb.rawItemsBlocks = nil
tb.rawItemsLastFlushTime = fasttime.UnixTimestamp()
}
tb.rawItemsLock.Unlock()
if blocksToMerge == nil {
// Fast path.
return err
}
// Slow path: merge blocksToMerge.
tb.mergeRawItemsBlocks(blocksToMerge)
return err
return nil
}
// getParts appends parts snapshot to dst and returns it.
@ -522,9 +579,25 @@ func (tb *Table) DebugFlush() {
}
func (tb *Table) flushRawItems(isFinal bool) {
tb.rawItems.flush(tb, isFinal)
}
func (riss *rawItemsShards) flush(tb *Table, isFinal bool) {
tb.rawItemsPendingFlushesWG.Add(1)
defer tb.rawItemsPendingFlushesWG.Done()
var wg sync.WaitGroup
wg.Add(len(riss.shards))
for i := range riss.shards {
go func(ris *rawItemsShard) {
ris.flush(tb, isFinal)
wg.Done()
}(&riss.shards[i])
}
wg.Wait()
}
func (ris *rawItemsShard) flush(tb *Table, isFinal bool) {
mustFlush := false
currentTime := fasttime.UnixTimestamp()
flushSeconds := int64(rawItemsFlushInterval.Seconds())
@ -533,14 +606,14 @@ func (tb *Table) flushRawItems(isFinal bool) {
}
var blocksToMerge []*inmemoryBlock
tb.rawItemsLock.Lock()
if isFinal || currentTime-tb.rawItemsLastFlushTime > uint64(flushSeconds) {
ris.mu.Lock()
if isFinal || currentTime-ris.lastFlushTime > uint64(flushSeconds) {
mustFlush = true
blocksToMerge = tb.rawItemsBlocks
tb.rawItemsBlocks = nil
tb.rawItemsLastFlushTime = currentTime
blocksToMerge = ris.ibs
ris.ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard)
ris.lastFlushTime = currentTime
}
tb.rawItemsLock.Unlock()
ris.mu.Unlock()
if mustFlush {
tb.mergeRawItemsBlocks(blocksToMerge)

View file

@ -437,54 +437,49 @@ func (pt *partition) AddRows(rows []rawRow) {
}
type rawRowsShards struct {
lock sync.Mutex
shardIdx int
shardIdx uint64
// Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard
}
func (rrs *rawRowsShards) init() {
rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition)
func (rrss *rawRowsShards) init() {
rrss.shards = make([]rawRowsShard, rawRowsShardsPerPartition)
}
func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) {
rrs.lock.Lock()
rrs.shardIdx++
if rrs.shardIdx >= len(rrs.shards) {
rrs.shardIdx = 0
}
shard := &rrs.shards[rrs.shardIdx]
rrs.lock.Unlock()
func (rrss *rawRowsShards) addRows(pt *partition, rows []rawRow) {
n := atomic.AddUint64(&rrss.shardIdx, 1)
shards := rrss.shards
idx := n % uint64(len(shards))
shard := &shards[idx]
shard.addRows(pt, rows)
}
func (rrs *rawRowsShards) Len() int {
func (rrss *rawRowsShards) Len() int {
n := 0
for i := range rrs.shards[:] {
n += rrs.shards[i].Len()
for i := range rrss.shards[:] {
n += rrss.shards[i].Len()
}
return n
}
type rawRowsShard struct {
lock sync.Mutex
mu sync.Mutex
rows []rawRow
lastFlushTime uint64
}
func (rrs *rawRowsShard) Len() int {
rrs.lock.Lock()
rrs.mu.Lock()
n := len(rrs.rows)
rrs.lock.Unlock()
rrs.mu.Unlock()
return n
}
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
var rrss []*rawRows
rrs.lock.Lock()
rrs.mu.Lock()
if cap(rrs.rows) == 0 {
rrs.rows = getRawRowsMaxSize().rows
}
@ -506,7 +501,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
rrss = append(rrss, rr)
rrs.lastFlushTime = fasttime.UnixTimestamp()
}
rrs.lock.Unlock()
rrs.mu.Unlock()
for _, rr := range rrss {
pt.addRowsPart(rr.rows)
@ -752,10 +747,16 @@ func (pt *partition) flushRawRows(isFinal bool) {
pt.rawRows.flush(pt, isFinal)
}
func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) {
for i := range rrs.shards[:] {
rrs.shards[i].flush(pt, isFinal)
func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) {
var wg sync.WaitGroup
wg.Add(len(rrss.shards))
for i := range rrss.shards {
go func(rrs *rawRowsShard) {
rrs.flush(pt, isFinal)
wg.Done()
}(&rrss.shards[i])
}
wg.Wait()
}
func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
@ -766,12 +767,12 @@ func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
flushSeconds = 1
}
rrs.lock.Lock()
rrs.mu.Lock()
if isFinal || currentTime-rrs.lastFlushTime > uint64(flushSeconds) {
rr = getRawRowsMaxSize()
rrs.rows, rr.rows = rr.rows, rrs.rows
}
rrs.lock.Unlock()
rrs.mu.Unlock()
if rr != nil {
pt.addRowsPart(rr.rows)