diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 9cc00d9913..74bd902fe3 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -61,6 +61,11 @@ const defaultPartsToMerge = 15 // write amplification. const finalPartsToMerge = 3 +// The number of shards for rawRow entries per partition. +// +// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. +var rawRowsShardsPerPartition = (runtime.GOMAXPROCS(-1) + 7) / 8 + // getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. func getMaxRawRowsPerPartition() int { maxRawRowsPerPartitionOnce.Do(func() { @@ -71,7 +76,7 @@ func getMaxRawRowsPerPartition() int { if n > 500e3 { n = 500e3 } - maxRawRowsPerPartition = n + maxRawRowsPerPartition = n / rawRowsShardsPerPartition }) return maxRawRowsPerPartition } @@ -129,16 +134,10 @@ type partition struct { // Contains file-based parts with big number of items. bigParts []*partWrapper - // rawRowsLock protects rawRows. - rawRowsLock sync.Mutex - // rawRows contains recently added rows that haven't been converted into parts yet. // // rawRows aren't used in search for performance reasons. - rawRows []rawRow - - // rawRowsLastFlushTime is the last time rawRows are flushed. - rawRowsLastFlushTime time.Time + rawRows rawRowsShards snapshotLock sync.RWMutex @@ -264,18 +263,18 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func } func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { - return &partition{ + p := &partition{ name: name, smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, getDeletedMetricIDs: getDeletedMetricIDs, - rawRows: getRawRowsMaxSize().rows, - mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), } + p.rawRows.init() + return p } // partitionMetrics contains essential metrics for the partition. @@ -322,10 +321,9 @@ type partitionMetrics struct { // UpdateMetrics updates m with metrics from pt. func (pt *partition) UpdateMetrics(m *partitionMetrics) { - pt.rawRowsLock.Lock() - m.PendingRows += uint64(len(pt.rawRows)) - m.SmallRowsCount += uint64(len(pt.rawRows)) - pt.rawRowsLock.Unlock() + rawRowsLen := uint64(pt.rawRows.Len()) + m.PendingRows += rawRowsLen + m.SmallRowsCount += rawRowsLen pt.partsLock.Lock() @@ -399,29 +397,82 @@ func (pt *partition) AddRows(rows []rawRow) { } } - // Try adding rows. - var rrs []*rawRows - pt.rawRowsLock.Lock() + pt.rawRows.addRows(pt, rows) +} + +type rawRowsShards struct { + lock sync.Mutex + shardIdx int + + // Shards reduce lock contention when adding rows on multi-CPU systems. + shards []rawRowsShard +} + +func (rrs *rawRowsShards) init() { + rrs.shards = make([]rawRowsShard, rawRowsShardsPerPartition) +} + +func (rrs *rawRowsShards) addRows(pt *partition, rows []rawRow) { + rrs.lock.Lock() + rrs.shardIdx++ + if rrs.shardIdx >= len(rrs.shards) { + rrs.shardIdx = 0 + } + shard := &rrs.shards[rrs.shardIdx] + rrs.lock.Unlock() + + shard.addRows(pt, rows) +} + +func (rrs *rawRowsShards) Len() int { + n := 0 + for i := range rrs.shards[:] { + n += rrs.shards[i].Len() + } + return n +} + +type rawRowsShard struct { + lock sync.Mutex + rows []rawRow + lastFlushTime time.Time +} + +func (rrs *rawRowsShard) Len() int { + rrs.lock.Lock() + n := len(rrs.rows) + rrs.lock.Unlock() + return n +} + +func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) { + var rrss []*rawRows + + rrs.lock.Lock() + if cap(rrs.rows) == 0 { + rrs.rows = getRawRowsMaxSize().rows + } + maxRowsCount := getMaxRawRowsPerPartition() for { - capacity := cap(pt.rawRows) - len(pt.rawRows) + capacity := maxRowsCount - len(rrs.rows) if capacity >= len(rows) { // Fast path - rows fit capacity. - pt.rawRows = append(pt.rawRows, rows...) + rrs.rows = append(rrs.rows, rows...) break } // Slow path - rows don't fit capacity. // Fill rawRows to capacity and convert it to a part. - pt.rawRows = append(pt.rawRows, rows[:capacity]...) + rrs.rows = append(rrs.rows, rows[:capacity]...) rows = rows[capacity:] rr := getRawRowsMaxSize() - pt.rawRows, rr.rows = rr.rows, pt.rawRows - rrs = append(rrs, rr) - pt.rawRowsLastFlushTime = time.Now() + rrs.rows, rr.rows = rr.rows, rrs.rows + rrss = append(rrss, rr) + rrs.lastFlushTime = time.Now() } - pt.rawRowsLock.Unlock() + rrs.lock.Unlock() - for _, rr := range rrs { + for _, rr := range rrss { pt.addRowsPart(rr.rows) putRawRows(rr) } @@ -583,7 +634,7 @@ func (pt *partition) MustClose() { startTime = time.Now() // Flush raw rows the last time before exit. - pt.flushRawRows(nil, true) + pt.flushRawRows(true) // Flush inmemory parts to disk. var pws []*partWrapper @@ -637,7 +688,6 @@ func (pt *partition) startRawRowsFlusher() { } func (pt *partition) rawRowsFlusher() { - var rawRows []rawRow t := time.NewTimer(rawRowsFlushInterval) for { select { @@ -646,29 +696,35 @@ func (pt *partition) rawRowsFlusher() { case <-t.C: t.Reset(rawRowsFlushInterval) } - - rawRows = pt.flushRawRows(rawRows[:0], false) + pt.flushRawRows(false) } } -func (pt *partition) flushRawRows(newRawRows []rawRow, isFinal bool) []rawRow { - oldRawRows := newRawRows[:0] - mustFlush := false +func (pt *partition) flushRawRows(isFinal bool) { + pt.rawRows.flush(pt, isFinal) +} + +func (rrs *rawRowsShards) flush(pt *partition, isFinal bool) { + for i := range rrs.shards[:] { + rrs.shards[i].flush(pt, isFinal) + } +} + +func (rrs *rawRowsShard) flush(pt *partition, isFinal bool) { + var rr *rawRows currentTime := time.Now() - pt.rawRowsLock.Lock() - if isFinal || currentTime.Sub(pt.rawRowsLastFlushTime) > rawRowsFlushInterval { - mustFlush = true - oldRawRows = pt.rawRows - pt.rawRows = newRawRows[:0] - pt.rawRowsLastFlushTime = currentTime + rrs.lock.Lock() + if isFinal || currentTime.Sub(rrs.lastFlushTime) > rawRowsFlushInterval { + rr = getRawRowsMaxSize() + rrs.rows, rr.rows = rr.rows, rrs.rows } - pt.rawRowsLock.Unlock() + rrs.lock.Unlock() - if mustFlush { - pt.addRowsPart(oldRawRows) + if rr != nil { + pt.addRowsPart(rr.rows) + putRawRows(rr) } - return oldRawRows } func (pt *partition) startInmemoryPartsFlusher() { @@ -1334,7 +1390,7 @@ func (pt *partition) CreateSnapshotAt(smallPath, bigPath string) error { startTime := time.Now() // Flush inmemory data to disk. - pt.flushRawRows(nil, true) + pt.flushRawRows(true) if _, err := pt.flushInmemoryParts(nil, true); err != nil { return fmt.Errorf("cannot flush inmemory parts: %s", err) } diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 7089653e7c..7f8b515b84 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -185,7 +185,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.AddRows(rows) // Flush just added rows to a separate partition. - pt.flushRawRows(nil, true) + pt.flushRawRows(true) } testPartitionSearch(t, pt, tsids, tr, rbsExpected, -1) pt.MustClose() diff --git a/lib/storage/table.go b/lib/storage/table.go index fa9ac2a085..3746a05473 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -220,7 +220,7 @@ func (tb *table) flushRawRows() { defer tb.PutPartitions(ptws) for _, ptw := range ptws { - ptw.pt.flushRawRows(nil, true) + ptw.pt.flushRawRows(true) } }