diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 9a19ffd28..71633748e 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -192,8 +192,8 @@ func (riss *rawItemsShards) Len() int { } type rawItemsShardNopad struct { - // Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTime uint64 + // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures + lastFlushTimeMs int64 mu sync.Mutex ibs []*inmemoryBlock @@ -236,7 +236,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ibsToFlush = append(ibsToFlush, ibs...) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) tailItems = items[i:] - atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) + atomic.StoreInt64(&ris.lastFlushTimeMs, time.Now().UnixMilli()) break } ib = &inmemoryBlock{} @@ -696,7 +696,8 @@ func (tb *Table) DebugFlush() { } func (tb *Table) pendingItemsFlusher() { - d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) + // do not add jitter in order to guarantee flush interval + d := pendingItemsFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -710,7 +711,8 @@ func (tb *Table) pendingItemsFlusher() { } func (tb *Table) inmemoryPartsFlusher() { - d := timeutil.AddJitterToDuration(dataFlushInterval) + // do not add jitter in order to guarantee flush interval + d := dataFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -761,13 +763,9 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - currentTime := fasttime.UnixTimestamp() - flushSeconds := int64(pendingItemsFlushInterval.Seconds()) - if flushSeconds <= 0 { - flushSeconds = 1 - } - lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime) - if !isFinal && currentTime < lastFlushTime+uint64(flushSeconds) { + currentTime := time.Now().UnixMilli() + lastFlushTime := atomic.LoadInt64(&ris.lastFlushTimeMs) + if !isFinal && currentTime < lastFlushTime+pendingItemsFlushInterval.Milliseconds() { // Fast path - nothing to flush return dst } @@ -779,7 +777,7 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool ibs[i] = nil } ris.ibs = ibs[:0] - atomic.StoreUint64(&ris.lastFlushTime, currentTime) + atomic.StoreInt64(&ris.lastFlushTimeMs, currentTime) ris.mu.Unlock() return dst } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index e680885ca..de8295370 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -15,7 +15,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -51,7 +50,7 @@ const defaultPartsToMerge = 15 var rawRowsShardsPerPartition = cgroup.AvailableCPUs() // The interval for flushing buffered rows into parts, so they become visible to search. -const pendingRowsFlushInterval = time.Second +const pendingRowsFlushInterval = 2 * time.Second // The interval for guaranteed flush of recently ingested data from memory to on-disk parts, // so they survive process crash. @@ -69,25 +68,10 @@ func SetDataFlushInterval(d time.Duration) { } } -// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet per earh rawRowsShard. -func getMaxRawRowsPerShard() int { - maxRawRowsPerPartitionOnce.Do(func() { - n := memory.Allowed() / rawRowsShardsPerPartition / (100 * int(unsafe.Sizeof(rawRow{}))) - if n < 1e4 { - n = 1e4 - } - if n > 500e3 { - n = 500e3 - } - maxRawRowsPerPartition = n - }) - return maxRawRowsPerPartition -} - -var ( - maxRawRowsPerPartition int - maxRawRowsPerPartitionOnce sync.Once -) +// The maximum number of rawRow items in rawRowsShard. +// +// Limit the maximum shard size to 8Mb, since this gives the lowest CPU usage under high ingestion rate. +const maxRawRowsPerShard = (8 << 20) / int(unsafe.Sizeof(rawRow{})) // partition represents a partition. type partition struct { @@ -466,8 +450,8 @@ func (rrss *rawRowsShards) Len() int { } type rawRowsShardNopad struct { - // Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTime uint64 + // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures + lastFlushTimeMs int64 mu sync.Mutex rows []rawRow @@ -504,7 +488,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { n = copy(rrs.rows[:cap(rrs.rows)], rows) rrs.rows = rrs.rows[:n] rows = rows[n:] - atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp()) + atomic.StoreInt64(&rrs.lastFlushTimeMs, time.Now().UnixMilli()) } rrs.mu.Unlock() @@ -514,8 +498,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { } func newRawRows() []rawRow { - n := getMaxRawRowsPerShard() - return make([]rawRow, 0, n) + return make([]rawRow, 0, maxRawRowsPerShard) } func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { @@ -524,7 +507,7 @@ func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { } // Merge rows into in-memory parts. - maxRows := getMaxRawRowsPerShard() + maxRows := maxRawRowsPerShard var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) wg := getWaitGroup() @@ -1017,7 +1000,8 @@ func getBigPartsConcurrency() int { } func (pt *partition) inmemoryPartsFlusher() { - d := timeutil.AddJitterToDuration(dataFlushInterval) + // Do not add jitter to d in order to guarantee the flush interval + d := dataFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -1031,7 +1015,8 @@ func (pt *partition) inmemoryPartsFlusher() { } func (pt *partition) pendingRowsFlusher() { - d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) + // Do not add jitter to d in order to guarantee the flush interval + d := pendingRowsFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -1080,13 +1065,9 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { } func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { - currentTime := fasttime.UnixTimestamp() - flushSeconds := int64(pendingRowsFlushInterval.Seconds()) - if flushSeconds <= 0 { - flushSeconds = 1 - } - lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime) - if !isFinal && currentTime < lastFlushTime+uint64(flushSeconds) { + currentTime := time.Now().UnixMilli() + lastFlushTime := atomic.LoadInt64(&rrs.lastFlushTimeMs) + if !isFinal && currentTime < lastFlushTime+pendingRowsFlushInterval.Milliseconds() { // Fast path - nothing to flush return dst } @@ -1094,7 +1075,7 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawR rrs.mu.Lock() dst = append(dst, rrs.rows...) rrs.rows = rrs.rows[:0] - atomic.StoreUint64(&rrs.lastFlushTime, currentTime) + atomic.StoreInt64(&rrs.lastFlushTimeMs, currentTime) rrs.mu.Unlock() return dst } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index a0c8fd376..246cf9c60 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -103,7 +103,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) { startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000 - rowsPerInsert := getMaxRawRowsPerShard() + rowsPerInsert := maxRawRowsPerShard tb, strg := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) tr := TimeRange{