From 8669584e9ffa5958ba03f3fb42fec20d4dffcad7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 22 Feb 2024 20:06:37 +0200 Subject: [PATCH] lib/{storage,mergeset}: convert beffered items into searchable in-memory parts exactly once per the given flush interval Previously the interval between item addition and its conversion to searchable in-memory part could vary significantly because of too coarse per-second precision. Switch from fasttime.UnixTimestamp() to time.Now().UnixMilli() for millisecond precision. It is OK to use time.Now() for tracking the time when buffered items must be converted to searchable in-memory parts, since time.Now() calls aren't located in hot paths. Increase the flush interval for converting buffered samples to searchable in-memory parts from one second to two seconds. This should reduce the number of blocks, which are needed to be processed during high-frequency alerting queries. This, in turn, should reduce CPU usage. While at it, hardcode the maximum size of rawRows shard to 8Mb, since this size gives the optimal data ingestion pefromance according to load tests. This reduces memory usage and CPU usage on systems with big amounts of RAM under high data ingestion rate. --- lib/mergeset/table.go | 24 +++++------ lib/storage/partition.go | 55 ++++++++----------------- lib/storage/table_search_timing_test.go | 2 +- 3 files changed, 30 insertions(+), 51 deletions(-) diff --git a/lib/mergeset/table.go b/lib/mergeset/table.go index 9a19ffd284..71633748ec 100644 --- a/lib/mergeset/table.go +++ b/lib/mergeset/table.go @@ -192,8 +192,8 @@ func (riss *rawItemsShards) Len() int { } type rawItemsShardNopad struct { - // Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTime uint64 + // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures + lastFlushTimeMs int64 mu sync.Mutex ibs []*inmemoryBlock @@ -236,7 +236,7 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte { ibsToFlush = append(ibsToFlush, ibs...) ibs = make([]*inmemoryBlock, 0, maxBlocksPerShard) tailItems = items[i:] - atomic.StoreUint64(&ris.lastFlushTime, fasttime.UnixTimestamp()) + atomic.StoreInt64(&ris.lastFlushTimeMs, time.Now().UnixMilli()) break } ib = &inmemoryBlock{} @@ -696,7 +696,8 @@ func (tb *Table) DebugFlush() { } func (tb *Table) pendingItemsFlusher() { - d := timeutil.AddJitterToDuration(pendingItemsFlushInterval) + // do not add jitter in order to guarantee flush interval + d := pendingItemsFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -710,7 +711,8 @@ func (tb *Table) pendingItemsFlusher() { } func (tb *Table) inmemoryPartsFlusher() { - d := timeutil.AddJitterToDuration(dataFlushInterval) + // do not add jitter in order to guarantee flush interval + d := dataFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -761,13 +763,9 @@ func (riss *rawItemsShards) flush(tb *Table, isFinal bool) { } func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool) []*inmemoryBlock { - currentTime := fasttime.UnixTimestamp() - flushSeconds := int64(pendingItemsFlushInterval.Seconds()) - if flushSeconds <= 0 { - flushSeconds = 1 - } - lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime) - if !isFinal && currentTime < lastFlushTime+uint64(flushSeconds) { + currentTime := time.Now().UnixMilli() + lastFlushTime := atomic.LoadInt64(&ris.lastFlushTimeMs) + if !isFinal && currentTime < lastFlushTime+pendingItemsFlushInterval.Milliseconds() { // Fast path - nothing to flush return dst } @@ -779,7 +777,7 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, isFinal bool ibs[i] = nil } ris.ibs = ibs[:0] - atomic.StoreUint64(&ris.lastFlushTime, currentTime) + atomic.StoreInt64(&ris.lastFlushTimeMs, currentTime) ris.mu.Unlock() return dst } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index e680885ca6..de82953705 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -15,7 +15,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" @@ -51,7 +50,7 @@ const defaultPartsToMerge = 15 var rawRowsShardsPerPartition = cgroup.AvailableCPUs() // The interval for flushing buffered rows into parts, so they become visible to search. -const pendingRowsFlushInterval = time.Second +const pendingRowsFlushInterval = 2 * time.Second // The interval for guaranteed flush of recently ingested data from memory to on-disk parts, // so they survive process crash. @@ -69,25 +68,10 @@ func SetDataFlushInterval(d time.Duration) { } } -// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet per earh rawRowsShard. -func getMaxRawRowsPerShard() int { - maxRawRowsPerPartitionOnce.Do(func() { - n := memory.Allowed() / rawRowsShardsPerPartition / (100 * int(unsafe.Sizeof(rawRow{}))) - if n < 1e4 { - n = 1e4 - } - if n > 500e3 { - n = 500e3 - } - maxRawRowsPerPartition = n - }) - return maxRawRowsPerPartition -} - -var ( - maxRawRowsPerPartition int - maxRawRowsPerPartitionOnce sync.Once -) +// The maximum number of rawRow items in rawRowsShard. +// +// Limit the maximum shard size to 8Mb, since this gives the lowest CPU usage under high ingestion rate. +const maxRawRowsPerShard = (8 << 20) / int(unsafe.Sizeof(rawRow{})) // partition represents a partition. type partition struct { @@ -466,8 +450,8 @@ func (rrss *rawRowsShards) Len() int { } type rawRowsShardNopad struct { - // Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures - lastFlushTime uint64 + // Put lastFlushTimeMs to the top in order to avoid unaligned memory access on 32-bit architectures + lastFlushTimeMs int64 mu sync.Mutex rows []rawRow @@ -504,7 +488,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { n = copy(rrs.rows[:cap(rrs.rows)], rows) rrs.rows = rrs.rows[:n] rows = rows[n:] - atomic.StoreUint64(&rrs.lastFlushTime, fasttime.UnixTimestamp()) + atomic.StoreInt64(&rrs.lastFlushTimeMs, time.Now().UnixMilli()) } rrs.mu.Unlock() @@ -514,8 +498,7 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) []rawRow { } func newRawRows() []rawRow { - n := getMaxRawRowsPerShard() - return make([]rawRow, 0, n) + return make([]rawRow, 0, maxRawRowsPerShard) } func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { @@ -524,7 +507,7 @@ func (pt *partition) flushRowsToInmemoryParts(rows []rawRow) { } // Merge rows into in-memory parts. - maxRows := getMaxRawRowsPerShard() + maxRows := maxRawRowsPerShard var pwsLock sync.Mutex pws := make([]*partWrapper, 0, (len(rows)+maxRows-1)/maxRows) wg := getWaitGroup() @@ -1017,7 +1000,8 @@ func getBigPartsConcurrency() int { } func (pt *partition) inmemoryPartsFlusher() { - d := timeutil.AddJitterToDuration(dataFlushInterval) + // Do not add jitter to d in order to guarantee the flush interval + d := dataFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -1031,7 +1015,8 @@ func (pt *partition) inmemoryPartsFlusher() { } func (pt *partition) pendingRowsFlusher() { - d := timeutil.AddJitterToDuration(pendingRowsFlushInterval) + // Do not add jitter to d in order to guarantee the flush interval + d := pendingRowsFlushInterval ticker := time.NewTicker(d) defer ticker.Stop() for { @@ -1080,13 +1065,9 @@ func (rrss *rawRowsShards) flush(pt *partition, isFinal bool) { } func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawRow { - currentTime := fasttime.UnixTimestamp() - flushSeconds := int64(pendingRowsFlushInterval.Seconds()) - if flushSeconds <= 0 { - flushSeconds = 1 - } - lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime) - if !isFinal && currentTime < lastFlushTime+uint64(flushSeconds) { + currentTime := time.Now().UnixMilli() + lastFlushTime := atomic.LoadInt64(&rrs.lastFlushTimeMs) + if !isFinal && currentTime < lastFlushTime+pendingRowsFlushInterval.Milliseconds() { // Fast path - nothing to flush return dst } @@ -1094,7 +1075,7 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, isFinal bool) []rawR rrs.mu.Lock() dst = append(dst, rrs.rows...) rrs.rows = rrs.rows[:0] - atomic.StoreUint64(&rrs.lastFlushTime, currentTime) + atomic.StoreInt64(&rrs.lastFlushTimeMs, currentTime) rrs.mu.Unlock() return dst } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index a0c8fd376d..246cf9c60e 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -103,7 +103,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int) { startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000 - rowsPerInsert := getMaxRawRowsPerShard() + rowsPerInsert := maxRawRowsPerShard tb, strg := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) tr := TimeRange{