lib/storage: scale ingestion performance by sharding rawRows on systems with more than 8 CPU cores

This commit is contained in:
Aliaksandr Valialkin 2019-12-19 18:12:02 +02:00
parent 97f70ccda7
commit 1825893eef
3 changed files with 102 additions and 46 deletions

View file

@ -61,6 +61,11 @@ const defaultPartsToMerge = 15
// write amplification.
const finalPartsToMerge = 3
// The number of shards for rawRow entries per partition.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerPartition() int {
maxRawRowsPerPartitionOnce.Do(func() {
@ -71,7 +76,7 @@ func getMaxRawRowsPerPartition() int {
if n > 500e3 {
n = 500e3
}
maxRawRowsPerPartition = n
maxRawRowsPerPartition = n / rawRowsShardsPerPartition
})
return maxRawRowsPerPartition
}
@ -129,16 +134,10 @@ type partition struct {
// Contains file-based parts with big number of items.
bigParts []*partWrapper
// rawRowsLock protects rawRows.
rawRowsLock sync.Mutex
// rawRows contains recently added rows that haven't been converted into parts yet.
//
// rawRows aren't used in search for performance reasons.
rawRows []rawRow
// rawRowsLastFlushTime is the last time rawRows are flushed.
rawRowsLastFlushTime time.Time
rawRows rawRowsShards
snapshotLock sync.RWMutex
@ -264,18 +263,18 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
}
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition {
return &partition{
p := &partition{
name: name,
smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath,
getDeletedMetricIDs: getDeletedMetricIDs,
rawRows: getRawRowsMaxSize().rows,
mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}),
}
p.rawRows.init()
return p
}
// partitionMetrics contains essential metrics for the partition.
@ -322,10 +321,9 @@ type partitionMetrics struct {
// UpdateMetrics updates m with metrics from pt.
func (pt *partition) UpdateMetrics(m *partitionMetrics) {
pt.rawRowsLock.Lock()
m.PendingRows += uint64(len(pt.rawRows))
m.SmallRowsCount += uint64(len(pt.rawRows))
pt.rawRowsLock.Unlock()
rawRowsLen := uint64(pt.rawRows.Len())
m.PendingRows += rawRowsLen
m.SmallRowsCount += rawRowsLen
pt.partsLock.Lock()
@ -399,29 +397,82 @@ func (pt *partition) AddRows(rows []rawRow) {
}
}
// Try adding rows.
var rrs []*rawRows
pt.rawRowsLock.Lock()
pt.rawRows.addRows(pt, rows)
}
type rawRowsShards struct {
lock sync.Mutex
shardIdx int
// Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard
}
func (rrs *rawRowsShards) init() {
rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition)
}
func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) {
rrs.lock.Lock()
rrs.shardIdx++
if rrs.shardIdx >= len(rrs.shards) {
rrs.shardIdx = 0
}
shard := &rrs.shards[rrs.shardIdx]
rrs.lock.Unlock()
shard.addRows(pt, rows)
}
func (rrs *rawRowsShards) Len() int {
n := 0
for i := range rrs.shards[:] {
n += rrs.shards[i].Len()
}
return n
}
type rawRowsShard struct {
lock sync.Mutex
rows []rawRow
lastFlushTime time.Time
}
func (rrs *rawRowsShard) Len() int {
rrs.lock.Lock()
n := len(rrs.rows)
rrs.lock.Unlock()
return n
}
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
var rrss []*rawRows
rrs.lock.Lock()
if cap(rrs.rows) == 0 {
rrs.rows = getRawRowsMaxSize().rows
}
maxRowsCount := getMaxRawRowsPerPartition()
for {
capacity := cap(pt.rawRows) - len(pt.rawRows)
capacity := maxRowsCount - len(rrs.rows)
if capacity >= len(rows) {
// Fast path - rows fit capacity.
pt.rawRows = append(pt.rawRows, rows...)
rrs.rows = append(rrs.rows, rows...)
break
}
// Slow path - rows don't fit capacity.
// Fill rawRows to capacity and convert it to a part.
pt.rawRows = append(pt.rawRows, rows[:capacity]...)
rrs.rows = append(rrs.rows, rows[:capacity]...)
rows = rows[capacity:]
rr := getRawRowsMaxSize()
pt.rawRows, rr.rows = rr.rows, pt.rawRows
rrs = append(rrs, rr)
pt.rawRowsLastFlushTime = time.Now()
rrs.rows, rr.rows = rr.rows, rrs.rows
rrss = append(rrss, rr)
rrs.lastFlushTime = time.Now()
}
pt.rawRowsLock.Unlock()
rrs.lock.Unlock()
for _, rr := range rrs {
for _, rr := range rrss {
pt.addRowsPart(rr.rows)
putRawRows(rr)
}
@ -583,7 +634,7 @@ func (pt *partition) MustClose() {
startTime = time.Now()
// Flush raw rows the last time before exit.
pt.flushRawRows(nil, true)
pt.flushRawRows(true)
// Flush inmemory parts to disk.
var pws []*partWrapper
@ -637,7 +688,6 @@ func (pt *partition) startRawRowsFlusher() {
}
func (pt *partition) rawRowsFlusher() {
var rawRows []rawRow
t := time.NewTimer(rawRowsFlushInterval)
for {
select {
@ -646,29 +696,35 @@ func (pt *partition) rawRowsFlusher() {
case <-t.C:
t.Reset(rawRowsFlushInterval)
}
rawRows = pt.flushRawRows(rawRows[:0], false)
pt.flushRawRows(false)
}
}
func (pt *partition) flushRawRows(newRawRows []rawRow, isFinal bool) []rawRow {
oldRawRows := newRawRows[:0]
mustFlush := false
func (pt *partition) flushRawRows(isFinal bool) {
pt.rawRows.flush(pt, isFinal)
}
func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) {
for i := range rrs.shards[:] {
rrs.shards[i].flush(pt, isFinal)
}
}
func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
var rr *rawRows
currentTime := time.Now()
pt.rawRowsLock.Lock()
if isFinal || currentTime.Sub(pt.rawRowsLastFlushTime) > rawRowsFlushInterval {
mustFlush = true
oldRawRows = pt.rawRows
pt.rawRows = newRawRows[:0]
pt.rawRowsLastFlushTime = currentTime
rrs.lock.Lock()
if isFinal || currentTime.Sub(rrs.lastFlushTime) > rawRowsFlushInterval {
rr = getRawRowsMaxSize()
rrs.rows, rr.rows = rr.rows, rrs.rows
}
pt.rawRowsLock.Unlock()
rrs.lock.Unlock()
if mustFlush {
pt.addRowsPart(oldRawRows)
if rr != nil {
pt.addRowsPart(rr.rows)
putRawRows(rr)
}
return oldRawRows
}
func (pt *partition) startInmemoryPartsFlusher() {
@ -1334,7 +1390,7 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
startTime := time.Now()
// Flush inmemory data to disk.
pt.flushRawRows(nil, true)
pt.flushRawRows(true)
if _, err := pt.flushInmemoryParts(nil, true); err != nil {
return fmt.Errorf("cannot flush inmemory parts: %s", err)
}

View file

@ -185,7 +185,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.AddRows(rows)
// Flush just added rows to a separate partition.
pt.flushRawRows(nil, true)
pt.flushRawRows(true)
}
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose()

View file

@ -220,7 +220,7 @@ func (tb *table) flushRawRows() {
defer tb.PutPartitions(ptws)
for _, ptw := range ptws {
ptw.pt.flushRawRows(nil, true)
ptw.pt.flushRawRows(true)
}
}