lib/storage: scale ingestion performance by sharding rawRows on systems with more than 8 CPU cores

This commit is contained in:
Aliaksandr Valialkin 2019-12-19 18:12:02 +02:00
parent 8d79412b26
commit a37a006f11
3 changed files with 102 additions and 46 deletions

View file

@ -61,6 +61,11 @@ const defaultPartsToMerge = 15
// write amplification. // write amplification.
const finalPartsToMerge = 3 const finalPartsToMerge = 3
// The number of shards for rawRow entries per partition.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerPartition() int { func getMaxRawRowsPerPartition() int {
maxRawRowsPerPartitionOnce.Do(func() { maxRawRowsPerPartitionOnce.Do(func() {
@ -71,7 +76,7 @@ func getMaxRawRowsPerPartition() int {
if n > 500e3 { if n > 500e3 {
n = 500e3 n = 500e3
} }
maxRawRowsPerPartition = n maxRawRowsPerPartition = n / rawRowsShardsPerPartition
}) })
return maxRawRowsPerPartition return maxRawRowsPerPartition
} }
@ -129,16 +134,10 @@ type partition struct {
// Contains file-based parts with big number of items. // Contains file-based parts with big number of items.
bigParts []*partWrapper bigParts []*partWrapper
// rawRowsLock protects rawRows.
rawRowsLock sync.Mutex
// rawRows contains recently added rows that haven't been converted into parts yet. // rawRows contains recently added rows that haven't been converted into parts yet.
// //
// rawRows aren't used in search for performance reasons. // rawRows aren't used in search for performance reasons.
rawRows []rawRow rawRows rawRowsShards
// rawRowsLastFlushTime is the last time rawRows are flushed.
rawRowsLastFlushTime time.Time
snapshotLock sync.RWMutex snapshotLock sync.RWMutex
@ -264,18 +263,18 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
} }
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition {
return &partition{ p := &partition{
name: name, name: name,
smallPartsPath: smallPartsPath, smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath, bigPartsPath: bigPartsPath,
getDeletedMetricIDs: getDeletedMetricIDs, getDeletedMetricIDs: getDeletedMetricIDs,
rawRows: getRawRowsMaxSize().rows,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
p.rawRows.init()
return p
} }
// partitionMetrics contains essential metrics for the partition. // partitionMetrics contains essential metrics for the partition.
@ -322,10 +321,9 @@ type partitionMetrics struct {
// UpdateMetrics updates m with metrics from pt. // UpdateMetrics updates m with metrics from pt.
func (pt *partition) UpdateMetrics(m *partitionMetrics) { func (pt *partition) UpdateMetrics(m *partitionMetrics) {
pt.rawRowsLock.Lock() rawRowsLen := uint64(pt.rawRows.Len())
m.PendingRows += uint64(len(pt.rawRows)) m.PendingRows += rawRowsLen
m.SmallRowsCount += uint64(len(pt.rawRows)) m.SmallRowsCount += rawRowsLen
pt.rawRowsLock.Unlock()
pt.partsLock.Lock() pt.partsLock.Lock()
@ -399,29 +397,82 @@ func (pt *partition) AddRows(rows []rawRow) {
} }
} }
// Try adding rows. pt.rawRows.addRows(pt, rows)
var rrs []*rawRows }
pt.rawRowsLock.Lock()
type rawRowsShards struct {
lock sync.Mutex
shardIdx int
// Shards reduce lock contention when adding rows on multi-CPU systems.
shards []rawRowsShard
}
func (rrs *rawRowsShards) init() {
rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition)
}
func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) {
rrs.lock.Lock()
rrs.shardIdx++
if rrs.shardIdx >= len(rrs.shards) {
rrs.shardIdx = 0
}
shard := &rrs.shards[rrs.shardIdx]
rrs.lock.Unlock()
shard.addRows(pt, rows)
}
func (rrs *rawRowsShards) Len() int {
n := 0
for i := range rrs.shards[:] {
n += rrs.shards[i].Len()
}
return n
}
type rawRowsShard struct {
lock sync.Mutex
rows []rawRow
lastFlushTime time.Time
}
func (rrs *rawRowsShard) Len() int {
rrs.lock.Lock()
n := len(rrs.rows)
rrs.lock.Unlock()
return n
}
func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
var rrss []*rawRows
rrs.lock.Lock()
if cap(rrs.rows) == 0 {
rrs.rows = getRawRowsMaxSize().rows
}
maxRowsCount := getMaxRawRowsPerPartition()
for { for {
capacity := cap(pt.rawRows) - len(pt.rawRows) capacity := maxRowsCount - len(rrs.rows)
if capacity >= len(rows) { if capacity >= len(rows) {
// Fast path - rows fit capacity. // Fast path - rows fit capacity.
pt.rawRows = append(pt.rawRows, rows...) rrs.rows = append(rrs.rows, rows...)
break break
} }
// Slow path - rows don't fit capacity. // Slow path - rows don't fit capacity.
// Fill rawRows to capacity and convert it to a part. // Fill rawRows to capacity and convert it to a part.
pt.rawRows = append(pt.rawRows, rows[:capacity]...) rrs.rows = append(rrs.rows, rows[:capacity]...)
rows = rows[capacity:] rows = rows[capacity:]
rr := getRawRowsMaxSize() rr := getRawRowsMaxSize()
pt.rawRows, rr.rows = rr.rows, pt.rawRows rrs.rows, rr.rows = rr.rows, rrs.rows
rrs = append(rrs, rr) rrss = append(rrss, rr)
pt.rawRowsLastFlushTime = time.Now() rrs.lastFlushTime = time.Now()
} }
pt.rawRowsLock.Unlock() rrs.lock.Unlock()
for _, rr := range rrs { for _, rr := range rrss {
pt.addRowsPart(rr.rows) pt.addRowsPart(rr.rows)
putRawRows(rr) putRawRows(rr)
} }
@ -583,7 +634,7 @@ func (pt *partition) MustClose() {
startTime = time.Now() startTime = time.Now()
// Flush raw rows the last time before exit. // Flush raw rows the last time before exit.
pt.flushRawRows(nil, true) pt.flushRawRows(true)
// Flush inmemory parts to disk. // Flush inmemory parts to disk.
var pws []*partWrapper var pws []*partWrapper
@ -637,7 +688,6 @@ func (pt *partition) startRawRowsFlusher() {
} }
func (pt *partition) rawRowsFlusher() { func (pt *partition) rawRowsFlusher() {
var rawRows []rawRow
t := time.NewTimer(rawRowsFlushInterval) t := time.NewTimer(rawRowsFlushInterval)
for { for {
select { select {
@ -646,29 +696,35 @@ func (pt *partition) rawRowsFlusher() {
case <-t.C: case <-t.C:
t.Reset(rawRowsFlushInterval) t.Reset(rawRowsFlushInterval)
} }
pt.flushRawRows(false)
rawRows = pt.flushRawRows(rawRows[:0], false)
} }
} }
func (pt *partition) flushRawRows(newRawRows []rawRow, isFinal bool) []rawRow { func (pt *partition) flushRawRows(isFinal bool) {
oldRawRows := newRawRows[:0] pt.rawRows.flush(pt, isFinal)
mustFlush := false }
func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) {
for i := range rrs.shards[:] {
rrs.shards[i].flush(pt, isFinal)
}
}
func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) {
var rr *rawRows
currentTime := time.Now() currentTime := time.Now()
pt.rawRowsLock.Lock() rrs.lock.Lock()
if isFinal || currentTime.Sub(pt.rawRowsLastFlushTime) > rawRowsFlushInterval { if isFinal || currentTime.Sub(rrs.lastFlushTime) > rawRowsFlushInterval {
mustFlush = true rr = getRawRowsMaxSize()
oldRawRows = pt.rawRows rrs.rows, rr.rows = rr.rows, rrs.rows
pt.rawRows = newRawRows[:0]
pt.rawRowsLastFlushTime = currentTime
} }
pt.rawRowsLock.Unlock() rrs.lock.Unlock()
if mustFlush { if rr != nil {
pt.addRowsPart(oldRawRows) pt.addRowsPart(rr.rows)
putRawRows(rr)
} }
return oldRawRows
} }
func (pt *partition) startInmemoryPartsFlusher() { func (pt *partition) startInmemoryPartsFlusher() {
@ -1334,7 +1390,7 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error {
startTime := time.Now() startTime := time.Now()
// Flush inmemory data to disk. // Flush inmemory data to disk.
pt.flushRawRows(nil, true) pt.flushRawRows(true)
if _, err := pt.flushInmemoryParts(nil, true); err != nil { if _, err := pt.flushInmemoryParts(nil, true); err != nil {
return fmt.Errorf("cannot flush inmemory parts: %s", err) return fmt.Errorf("cannot flush inmemory parts: %s", err)
} }

View file

@ -185,7 +185,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.AddRows(rows) pt.AddRows(rows)
// Flush just added rows to a separate partition. // Flush just added rows to a separate partition.
pt.flushRawRows(nil, true) pt.flushRawRows(true)
} }
testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1)
pt.MustClose() pt.MustClose()

View file

@ -220,7 +220,7 @@ func (tb *table) flushRawRows() {
defer tb.PutPartitions(ptws) defer tb.PutPartitions(ptws)
for _, ptw := range ptws { for _, ptw := range ptws {
ptw.pt.flushRawRows(nil, true) ptw.pt.flushRawRows(true)
} }
} }