2019-05-22 21:16:55 +00:00
package mergeset
import (
2023-03-19 08:36:05 +00:00
"encoding/json"
2020-09-17 00:02:35 +00:00
"errors"
2019-05-22 21:16:55 +00:00
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
2022-10-20 13:17:09 +00:00
"unsafe"
2019-05-22 21:16:55 +00:00
2020-12-08 18:49:32 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
2020-05-14 19:01:51 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
2019-09-09 08:41:30 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
2019-05-22 21:16:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/syncwg"
2024-01-22 16:12:37 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
2019-05-22 21:16:55 +00:00
)
2022-12-05 23:27:57 +00:00
// maxInmemoryParts is the maximum number of inmemory parts in the table.
2019-05-22 21:16:55 +00:00
//
2024-01-24 01:27:49 +00:00
// This limit allows reducing CPU usage under high ingestion rate.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
//
2019-05-22 21:16:55 +00:00
// This number may be reached when the insertion pace outreaches merger pace.
2022-12-13 00:49:21 +00:00
// If this number is reached, then assisted merges are performed
// during data ingestion.
2024-01-24 01:27:49 +00:00
const maxInmemoryParts = 15
2019-05-22 21:16:55 +00:00
2022-12-13 00:49:21 +00:00
// maxFileParts is the maximum number of file parts in the table.
//
// This number may be reached when the insertion pace outreaches merger pace.
// If this number is reached, then assisted merges are performed
// during data ingestion.
2024-01-24 01:27:49 +00:00
const maxFileParts = 30
2022-12-13 00:49:21 +00:00
2019-05-22 21:16:55 +00:00
// Default number of parts to merge at once.
//
// This number has been obtained empirically - it gives the lowest possible overhead.
// See appendPartsToMerge tests for details.
const defaultPartsToMerge = 15
// The final number of parts to merge at once.
//
// It must be smaller than defaultPartsToMerge.
// Lower value improves select performance at the cost of increased
// write amplification.
const finalPartsToMerge = 2
2021-08-25 06:35:03 +00:00
// maxPartSize is the maximum part size in bytes.
2019-05-22 21:16:55 +00:00
//
2021-08-25 06:35:03 +00:00
// This number should be limited by the amount of time required to merge parts of this summary size.
// The required time shouldn't exceed a day.
const maxPartSize = 400e9
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
// The interval for flushing buffered data to parts, so it becomes visible to search.
const pendingItemsFlushInterval = time . Second
// The interval for guaranteed flush of recently ingested data from memory to on-disk parts,
// so they survive process crash.
var dataFlushInterval = 5 * time . Second
// SetDataFlushInterval sets the interval for guaranteed flush of recently ingested data from memory to disk.
//
// The data can be flushed from memory to disk more frequently if it doesn't fit the memory limit.
//
// This function must be called before initializing the indexdb.
func SetDataFlushInterval ( d time . Duration ) {
if d > pendingItemsFlushInterval {
dataFlushInterval = d
}
}
2019-05-22 21:16:55 +00:00
// maxItemsPerCachedPart is the maximum items per created part by the merge,
// which must be cached in the OS page cache.
//
// Such parts are usually frequently accessed, so it is good to cache their
// contents in OS page cache.
2019-09-09 08:41:30 +00:00
func maxItemsPerCachedPart ( ) uint64 {
mem := memory . Remaining ( )
// Production data shows that each item occupies ~4 bytes in the compressed part.
// It is expected no more than defaultPartsToMerge/2 parts exist
// in the OS page cache before they are merged into bigger part.
// Halft of the remaining RAM must be left for lib/storage parts,
// so the maxItems is calculated using the below code:
maxItems := uint64 ( mem ) / ( 4 * defaultPartsToMerge )
if maxItems < 1e6 {
maxItems = 1e6
}
return maxItems
}
2019-05-22 21:16:55 +00:00
// Table represents mergeset table.
type Table struct {
2019-10-17 15:22:56 +00:00
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
2022-12-05 23:27:57 +00:00
activeInmemoryMerges uint64
activeFileMerges uint64
inmemoryMergesCount uint64
fileMergesCount uint64
inmemoryItemsMerged uint64
fileItemsMerged uint64
2024-01-24 13:07:40 +00:00
inmemoryAssistedMergesCount uint64
fileAssistedMergesCount uint64
2022-12-05 23:27:57 +00:00
2022-04-21 10:18:05 +00:00
itemsAdded uint64
itemsAddedSizeBytes uint64
2019-10-17 15:22:56 +00:00
mergeIdx uint64
2019-05-22 21:16:55 +00:00
path string
2021-07-06 09:17:15 +00:00
flushCallback func ( )
flushCallbackWorkerWG sync . WaitGroup
needFlushCallbackCall uint32
2019-08-29 11:39:05 +00:00
2019-09-20 16:46:47 +00:00
prepareBlock PrepareBlockCallback
2022-06-01 11:21:12 +00:00
isReadOnly * uint32
2019-09-20 16:46:47 +00:00
2021-04-27 12:36:31 +00:00
// rawItems contains recently added items that haven't been converted to parts yet.
//
// rawItems aren't used in search for performance reasons
rawItems rawItemsShards
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
// partsLock protects inmemoryParts and fileParts.
partsLock sync . Mutex
// inmemoryParts contains inmemory parts.
inmemoryParts [ ] * partWrapper
2024-01-24 01:27:49 +00:00
// inmemoryPartsLimitCh limits the number of inmemory parts
// in order to prevent from data ingestion slowdown as described at https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212
inmemoryPartsLimitCh chan struct { }
2022-12-05 23:27:57 +00:00
// fileParts contains file-backed parts.
fileParts [ ] * partWrapper
2023-01-18 09:09:03 +00:00
// This channel is used for signaling the background mergers that there are parts,
// which may need to be merged.
needMergeCh chan struct { }
2019-05-22 21:16:55 +00:00
stopCh chan struct { }
2022-12-04 07:03:05 +00:00
wg sync . WaitGroup
2019-05-22 21:16:55 +00:00
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg . WaitGroup
}
2021-04-27 12:36:31 +00:00
type rawItemsShards struct {
2021-04-27 13:41:22 +00:00
shardIdx uint32
2021-04-27 12:36:31 +00:00
// shards reduce lock contention when adding rows on multi-CPU systems.
shards [ ] rawItemsShard
}
// The number of shards for rawItems per table.
//
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
2022-04-06 16:35:50 +00:00
var rawItemsShardsPerTable = func ( ) int {
cpus := cgroup . AvailableCPUs ( )
multiplier := cpus
if multiplier > 16 {
multiplier = 16
}
2022-04-07 12:21:17 +00:00
return ( cpus * multiplier + 1 ) / 2
2022-04-06 16:35:50 +00:00
} ( )
2021-04-27 12:36:31 +00:00
2022-04-06 16:35:50 +00:00
const maxBlocksPerShard = 256
2021-04-27 12:36:31 +00:00
func ( riss * rawItemsShards ) init ( ) {
riss . shards = make ( [ ] rawItemsShard , rawItemsShardsPerTable )
}
2022-12-04 07:30:31 +00:00
func ( riss * rawItemsShards ) addItems ( tb * Table , items [ ] [ ] byte ) {
2021-04-27 12:36:31 +00:00
shards := riss . shards
2022-12-05 23:27:57 +00:00
shardsLen := uint32 ( len ( shards ) )
for len ( items ) > 0 {
n := atomic . AddUint32 ( & riss . shardIdx , 1 )
idx := n % shardsLen
items = shards [ idx ] . addItems ( tb , items )
}
2021-04-27 12:36:31 +00:00
}
func ( riss * rawItemsShards ) Len ( ) int {
n := 0
for i := range riss . shards {
n += riss . shards [ i ] . Len ( )
}
return n
}
2022-10-20 13:17:09 +00:00
type rawItemsShardNopad struct {
// Put lastFlushTime to the top in order to avoid unaligned memory access on 32-bit architectures
2021-04-27 12:36:31 +00:00
lastFlushTime uint64
2022-10-20 13:17:09 +00:00
mu sync . Mutex
ibs [ ] * inmemoryBlock
}
type rawItemsShard struct {
rawItemsShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [ 128 - unsafe . Sizeof ( rawItemsShardNopad { } ) % 128 ] byte
2021-04-27 12:36:31 +00:00
}
func ( ris * rawItemsShard ) Len ( ) int {
ris . mu . Lock ( )
n := 0
for _ , ib := range ris . ibs {
n += len ( ib . items )
}
ris . mu . Unlock ( )
return n
}
2022-12-05 23:27:57 +00:00
func ( ris * rawItemsShard ) addItems ( tb * Table , items [ ] [ ] byte ) [ ] [ ] byte {
var ibsToFlush [ ] * inmemoryBlock
var tailItems [ ] [ ] byte
2021-04-27 12:36:31 +00:00
ris . mu . Lock ( )
ibs := ris . ibs
if len ( ibs ) == 0 {
2021-05-27 22:09:32 +00:00
ib := getInmemoryBlock ( )
2021-04-27 12:36:31 +00:00
ibs = append ( ibs , ib )
ris . ibs = ibs
}
ib := ibs [ len ( ibs ) - 1 ]
2022-12-05 23:27:57 +00:00
for i , item := range items {
2022-12-04 07:30:31 +00:00
if ib . Add ( item ) {
continue
}
2022-12-05 23:27:57 +00:00
if len ( ibs ) >= maxBlocksPerShard {
ibsToFlush = ibs
ibs = make ( [ ] * inmemoryBlock , 0 , maxBlocksPerShard )
tailItems = items [ i : ]
atomic . StoreUint64 ( & ris . lastFlushTime , fasttime . UnixTimestamp ( ) )
break
}
2022-12-04 07:30:31 +00:00
ib = getInmemoryBlock ( )
if ib . Add ( item ) {
2021-04-27 12:36:31 +00:00
ibs = append ( ibs , ib )
2022-12-04 07:30:31 +00:00
continue
2021-04-27 12:36:31 +00:00
}
2022-12-04 07:30:31 +00:00
putInmemoryBlock ( ib )
logger . Panicf ( "BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items" , len ( item ) )
2021-04-27 12:36:31 +00:00
}
2022-12-04 07:30:31 +00:00
ris . ibs = ibs
2021-04-27 12:36:31 +00:00
ris . mu . Unlock ( )
2022-12-05 23:27:57 +00:00
tb . flushBlocksToParts ( ibsToFlush , false )
return tailItems
2021-04-27 12:36:31 +00:00
}
2019-05-22 21:16:55 +00:00
type partWrapper struct {
p * part
mp * inmemoryPart
2023-03-19 08:36:05 +00:00
refCount uint32
2023-06-14 16:13:16 +00:00
// mustBeDeleted marks partWrapper for deletion.
// This field should be updated only after partWrapper
// was removed from the list of active parts.
2023-03-19 08:36:05 +00:00
mustBeDeleted uint32
2019-05-22 21:16:55 +00:00
isInMerge bool
2022-12-05 23:27:57 +00:00
// The deadline when the in-memory part must be flushed to disk.
flushToDiskDeadline time . Time
2019-05-22 21:16:55 +00:00
}
func ( pw * partWrapper ) incRef ( ) {
2023-03-19 08:36:05 +00:00
atomic . AddUint32 ( & pw . refCount , 1 )
2019-05-22 21:16:55 +00:00
}
func ( pw * partWrapper ) decRef ( ) {
2023-03-19 08:36:05 +00:00
n := atomic . AddUint32 ( & pw . refCount , ^ uint32 ( 0 ) )
if int32 ( n ) < 0 {
logger . Panicf ( "BUG: pw.refCount must be bigger than 0; got %d" , int32 ( n ) )
2019-05-22 21:16:55 +00:00
}
if n > 0 {
return
}
2023-03-19 08:36:05 +00:00
deletePath := ""
if pw . mp == nil && atomic . LoadUint32 ( & pw . mustBeDeleted ) != 0 {
deletePath = pw . p . path
}
2019-05-22 21:16:55 +00:00
if pw . mp != nil {
2022-03-03 14:46:35 +00:00
// Do not return pw.mp to pool via putInmemoryPart(),
// since pw.mp size may be too big compared to other entries stored in the pool.
// This may result in increased memory usage because of high fragmentation.
2019-05-22 21:16:55 +00:00
pw . mp = nil
}
pw . p . MustClose ( )
pw . p = nil
2023-03-19 08:36:05 +00:00
if deletePath != "" {
fs . MustRemoveAll ( deletePath )
}
2019-05-22 21:16:55 +00:00
}
2023-04-15 05:08:43 +00:00
// MustOpenTable opens a table on the given path.
2019-05-22 21:16:55 +00:00
//
2019-08-29 11:39:05 +00:00
// Optional flushCallback is called every time new data batch is flushed
// to the underlying storage and becomes visible to search.
//
2019-09-20 16:46:47 +00:00
// Optional prepareBlock is called during merge before flushing the prepared block
// to persistent storage.
//
2019-05-22 21:16:55 +00:00
// The table is created if it doesn't exist yet.
2023-04-15 05:08:43 +00:00
func MustOpenTable ( path string , flushCallback func ( ) , prepareBlock PrepareBlockCallback , isReadOnly * uint32 ) * Table {
2019-05-22 21:16:55 +00:00
path = filepath . Clean ( path )
// Create a directory for the table if it doesn't exist yet.
2023-04-14 05:11:56 +00:00
fs . MustMkdirIfNotExist ( path )
2019-05-22 21:16:55 +00:00
// Open table parts.
2023-04-15 05:08:43 +00:00
pws := mustOpenParts ( path )
2019-05-22 21:16:55 +00:00
tb := & Table {
2024-01-24 01:27:49 +00:00
path : path ,
flushCallback : flushCallback ,
prepareBlock : prepareBlock ,
isReadOnly : isReadOnly ,
inmemoryPartsLimitCh : make ( chan struct { } , maxInmemoryParts ) ,
fileParts : pws ,
mergeIdx : uint64 ( time . Now ( ) . UnixNano ( ) ) ,
needMergeCh : make ( chan struct { } , 1 ) ,
stopCh : make ( chan struct { } ) ,
2019-05-22 21:16:55 +00:00
}
2021-04-27 12:36:31 +00:00
tb . rawItems . init ( )
2022-12-04 08:01:04 +00:00
tb . startBackgroundWorkers ( )
2019-05-22 21:16:55 +00:00
2023-01-18 09:09:03 +00:00
// Wake up a single background merger, so it could start merging parts if needed.
tb . notifyBackgroundMergers ( )
2021-07-06 09:17:15 +00:00
if flushCallback != nil {
tb . flushCallbackWorkerWG . Add ( 1 )
go func ( ) {
// call flushCallback once per 10 seconds in order to improve the effectiveness of caches,
// which are reset by the flushCallback.
2024-01-22 16:12:37 +00:00
d := timeutil . AddJitterToDuration ( time . Second * 10 )
tc := time . NewTicker ( d )
2021-07-06 09:17:15 +00:00
for {
select {
case <- tb . stopCh :
tb . flushCallback ( )
tb . flushCallbackWorkerWG . Done ( )
return
case <- tc . C :
if atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 1 , 0 ) {
tb . flushCallback ( )
}
}
}
} ( )
}
2023-04-15 05:08:43 +00:00
return tb
2019-05-22 21:16:55 +00:00
}
2022-12-04 08:01:04 +00:00
func ( tb * Table ) startBackgroundWorkers ( ) {
2022-12-05 23:27:57 +00:00
tb . startMergeWorkers ( )
tb . startInmemoryPartsFlusher ( )
tb . startPendingItemsFlusher ( )
2022-12-04 08:01:04 +00:00
}
2019-05-22 21:16:55 +00:00
// MustClose closes the table.
func ( tb * Table ) MustClose ( ) {
close ( tb . stopCh )
2023-05-16 22:01:22 +00:00
// Waiting for background workers to stop
2022-12-04 07:03:05 +00:00
tb . wg . Wait ( )
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
tb . flushInmemoryItems ( )
2021-07-06 09:17:15 +00:00
tb . flushCallbackWorkerWG . Wait ( )
2022-12-05 23:27:57 +00:00
// Remove references to parts from the tb, so they may be eventually closed after all the searches are done.
2019-05-22 21:16:55 +00:00
tb . partsLock . Lock ( )
2022-12-05 23:27:57 +00:00
inmemoryParts := tb . inmemoryParts
fileParts := tb . fileParts
tb . inmemoryParts = nil
tb . fileParts = nil
2019-05-22 21:16:55 +00:00
tb . partsLock . Unlock ( )
2022-12-05 23:27:57 +00:00
for _ , pw := range inmemoryParts {
pw . decRef ( )
}
for _ , pw := range fileParts {
2019-05-22 21:16:55 +00:00
pw . decRef ( )
}
}
// Path returns the path to tb on the filesystem.
func ( tb * Table ) Path ( ) string {
return tb . path
}
// TableMetrics contains essential metrics for the Table.
type TableMetrics struct {
2022-12-05 23:27:57 +00:00
ActiveInmemoryMerges uint64
ActiveFileMerges uint64
InmemoryMergesCount uint64
FileMergesCount uint64
InmemoryItemsMerged uint64
FileItemsMerged uint64
2024-01-24 13:07:40 +00:00
InmemoryAssistedMergesCount uint64
FileAssistedMergesCount uint64
2022-12-05 23:27:57 +00:00
2022-04-21 10:18:05 +00:00
ItemsAdded uint64
ItemsAddedSizeBytes uint64
2019-05-22 21:16:55 +00:00
PendingItems uint64
2022-12-05 23:27:57 +00:00
InmemoryPartsCount uint64
FilePartsCount uint64
InmemoryBlocksCount uint64
FileBlocksCount uint64
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
InmemoryItemsCount uint64
FileItemsCount uint64
InmemorySizeBytes uint64
FileSizeBytes uint64
2019-05-22 21:16:55 +00:00
2021-12-02 08:28:45 +00:00
DataBlocksCacheSize uint64
DataBlocksCacheSizeBytes uint64
DataBlocksCacheSizeMaxBytes uint64
DataBlocksCacheRequests uint64
DataBlocksCacheMisses uint64
IndexBlocksCacheSize uint64
IndexBlocksCacheSizeBytes uint64
IndexBlocksCacheSizeMaxBytes uint64
IndexBlocksCacheRequests uint64
IndexBlocksCacheMisses uint64
2019-05-22 21:16:55 +00:00
PartsRefCount uint64
}
2022-12-05 23:27:57 +00:00
// TotalItemsCount returns the total number of items in the table.
func ( tm * TableMetrics ) TotalItemsCount ( ) uint64 {
return tm . InmemoryItemsCount + tm . FileItemsCount
}
2019-05-22 21:16:55 +00:00
// UpdateMetrics updates m with metrics from tb.
func ( tb * Table ) UpdateMetrics ( m * TableMetrics ) {
2022-12-05 23:27:57 +00:00
m . ActiveInmemoryMerges += atomic . LoadUint64 ( & tb . activeInmemoryMerges )
m . ActiveFileMerges += atomic . LoadUint64 ( & tb . activeFileMerges )
m . InmemoryMergesCount += atomic . LoadUint64 ( & tb . inmemoryMergesCount )
m . FileMergesCount += atomic . LoadUint64 ( & tb . fileMergesCount )
m . InmemoryItemsMerged += atomic . LoadUint64 ( & tb . inmemoryItemsMerged )
m . FileItemsMerged += atomic . LoadUint64 ( & tb . fileItemsMerged )
2024-01-24 13:07:40 +00:00
m . InmemoryAssistedMergesCount += atomic . LoadUint64 ( & tb . inmemoryAssistedMergesCount )
m . FileAssistedMergesCount += atomic . LoadUint64 ( & tb . fileAssistedMergesCount )
2022-12-05 23:27:57 +00:00
2022-04-21 10:18:05 +00:00
m . ItemsAdded += atomic . LoadUint64 ( & tb . itemsAdded )
m . ItemsAddedSizeBytes += atomic . LoadUint64 ( & tb . itemsAddedSizeBytes )
2019-05-22 21:16:55 +00:00
2021-04-27 12:36:31 +00:00
m . PendingItems += uint64 ( tb . rawItems . Len ( ) )
2019-05-22 21:16:55 +00:00
tb . partsLock . Lock ( )
2022-12-05 23:27:57 +00:00
m . InmemoryPartsCount += uint64 ( len ( tb . inmemoryParts ) )
for _ , pw := range tb . inmemoryParts {
p := pw . p
m . InmemoryBlocksCount += p . ph . blocksCount
m . InmemoryItemsCount += p . ph . itemsCount
m . InmemorySizeBytes += p . size
2023-03-19 08:36:05 +00:00
m . PartsRefCount += uint64 ( atomic . LoadUint32 ( & pw . refCount ) )
2022-12-05 23:27:57 +00:00
}
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
m . FilePartsCount += uint64 ( len ( tb . fileParts ) )
for _ , pw := range tb . fileParts {
p := pw . p
m . FileBlocksCount += p . ph . blocksCount
m . FileItemsCount += p . ph . itemsCount
m . FileSizeBytes += p . size
2023-03-19 08:36:05 +00:00
m . PartsRefCount += uint64 ( atomic . LoadUint32 ( & pw . refCount ) )
2019-05-22 21:16:55 +00:00
}
tb . partsLock . Unlock ( )
2022-01-20 16:34:59 +00:00
m . DataBlocksCacheSize = uint64 ( ibCache . Len ( ) )
m . DataBlocksCacheSizeBytes = uint64 ( ibCache . SizeBytes ( ) )
m . DataBlocksCacheSizeMaxBytes = uint64 ( ibCache . SizeMaxBytes ( ) )
m . DataBlocksCacheRequests = ibCache . Requests ( )
m . DataBlocksCacheMisses = ibCache . Misses ( )
m . IndexBlocksCacheSize = uint64 ( idxbCache . Len ( ) )
m . IndexBlocksCacheSizeBytes = uint64 ( idxbCache . SizeBytes ( ) )
m . IndexBlocksCacheSizeMaxBytes = uint64 ( idxbCache . SizeMaxBytes ( ) )
m . IndexBlocksCacheRequests = idxbCache . Requests ( )
m . IndexBlocksCacheMisses = idxbCache . Misses ( )
2019-05-22 21:16:55 +00:00
}
// AddItems adds the given items to the tb.
2022-12-04 07:30:31 +00:00
//
// The function panics when items contains an item with length exceeding maxInmemoryBlockSize.
// It is caller's responsibility to make sure there are no too long items.
func ( tb * Table ) AddItems ( items [ ] [ ] byte ) {
tb . rawItems . addItems ( tb , items )
2022-04-21 10:18:05 +00:00
atomic . AddUint64 ( & tb . itemsAdded , uint64 ( len ( items ) ) )
n := 0
for _ , item := range items {
n += len ( item )
}
atomic . AddUint64 ( & tb . itemsAddedSizeBytes , uint64 ( n ) )
2019-05-22 21:16:55 +00:00
}
// getParts appends parts snapshot to dst and returns it.
//
// The appended parts must be released with putParts.
func ( tb * Table ) getParts ( dst [ ] * partWrapper ) [ ] * partWrapper {
tb . partsLock . Lock ( )
2022-12-05 23:27:57 +00:00
for _ , pw := range tb . inmemoryParts {
pw . incRef ( )
}
for _ , pw := range tb . fileParts {
2019-05-22 21:16:55 +00:00
pw . incRef ( )
}
2022-12-05 23:27:57 +00:00
dst = append ( dst , tb . inmemoryParts ... )
dst = append ( dst , tb . fileParts ... )
2019-05-22 21:16:55 +00:00
tb . partsLock . Unlock ( )
return dst
}
// putParts releases the given pws obtained via getParts.
func ( tb * Table ) putParts ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
pw . decRef ( )
}
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) mergePartsOptimal ( pws [ ] * partWrapper ) error {
sortPartsForOptimalMerge ( pws )
for len ( pws ) > 0 {
n := defaultPartsToMerge
if n > len ( pws ) {
n = len ( pws )
}
pwsChunk := pws [ : n ]
pws = pws [ n : ]
err := tb . mergeParts ( pwsChunk , nil , true )
if err == nil {
continue
}
tb . releasePartsToMerge ( pws )
return fmt . Errorf ( "cannot optimally merge %d parts: %w" , n , err )
}
return nil
}
2023-05-16 18:50:15 +00:00
// DebugFlush makes sure all the recently added data is visible to search.
2022-12-05 23:27:57 +00:00
//
2023-05-16 18:50:15 +00:00
// Note: this function doesn't store all the in-memory data to disk - it just converts
// recently added items to searchable parts, which can be stored either in memory
// (if they are quite small) or to persistent disk.
//
// This function is for debugging and testing purposes only,
// since it may slow down data ingestion when used frequently.
2022-12-05 23:27:57 +00:00
func ( tb * Table ) DebugFlush ( ) {
tb . flushPendingItems ( nil , true )
// Wait for background flushers to finish.
tb . rawItemsPendingFlushesWG . Wait ( )
}
func ( tb * Table ) startInmemoryPartsFlusher ( ) {
tb . wg . Add ( 1 )
go func ( ) {
tb . inmemoryPartsFlusher ( )
tb . wg . Done ( )
} ( )
}
func ( tb * Table ) startPendingItemsFlusher ( ) {
2022-12-04 07:03:05 +00:00
tb . wg . Add ( 1 )
2019-05-22 21:16:55 +00:00
go func ( ) {
2022-12-05 23:27:57 +00:00
tb . pendingItemsFlusher ( )
2022-12-04 07:03:05 +00:00
tb . wg . Done ( )
2019-05-22 21:16:55 +00:00
} ( )
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) inmemoryPartsFlusher ( ) {
2024-01-22 16:12:37 +00:00
d := timeutil . AddJitterToDuration ( dataFlushInterval )
ticker := time . NewTicker ( d )
2020-02-13 10:55:58 +00:00
defer ticker . Stop ( )
2019-05-22 21:16:55 +00:00
for {
select {
case <- tb . stopCh :
return
2020-02-13 10:55:58 +00:00
case <- ticker . C :
2022-12-05 23:27:57 +00:00
tb . flushInmemoryParts ( false )
2019-05-22 21:16:55 +00:00
}
}
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) pendingItemsFlusher ( ) {
ticker := time . NewTicker ( pendingItemsFlushInterval )
defer ticker . Stop ( )
var ibs [ ] * inmemoryBlock
for {
select {
case <- tb . stopCh :
return
case <- ticker . C :
ibs = tb . flushPendingItems ( ibs [ : 0 ] , false )
for i := range ibs {
ibs [ i ] = nil
}
2019-05-22 21:16:55 +00:00
}
}
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) flushPendingItems ( dst [ ] * inmemoryBlock , isFinal bool ) [ ] * inmemoryBlock {
return tb . rawItems . flush ( tb , dst , isFinal )
}
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
func ( tb * Table ) flushInmemoryItems ( ) {
tb . rawItems . flush ( tb , nil , true )
tb . flushInmemoryParts ( true )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) flushInmemoryParts ( isFinal bool ) {
2023-03-19 07:10:24 +00:00
currentTime := time . Now ( )
var pws [ ] * partWrapper
2022-12-05 23:27:57 +00:00
2023-03-19 07:10:24 +00:00
tb . partsLock . Lock ( )
for _ , pw := range tb . inmemoryParts {
if ! pw . isInMerge && ( isFinal || pw . flushToDiskDeadline . Before ( currentTime ) ) {
pw . isInMerge = true
pws = append ( pws , pw )
2022-12-05 23:27:57 +00:00
}
2023-03-19 07:10:24 +00:00
}
tb . partsLock . Unlock ( )
if err := tb . mergePartsOptimal ( pws ) ; err != nil {
logger . Panicf ( "FATAL: cannot merge in-memory parts: %s" , err )
2022-12-05 23:27:57 +00:00
}
2021-04-27 12:36:31 +00:00
}
2022-12-05 23:27:57 +00:00
func ( riss * rawItemsShards ) flush ( tb * Table , dst [ ] * inmemoryBlock , isFinal bool ) [ ] * inmemoryBlock {
2019-05-22 21:16:55 +00:00
tb . rawItemsPendingFlushesWG . Add ( 1 )
2021-04-27 12:36:31 +00:00
for i := range riss . shards {
2023-09-01 07:34:16 +00:00
dst = riss . shards [ i ] . appendBlocksToFlush ( dst , isFinal )
2021-04-27 12:36:31 +00:00
}
2022-12-05 23:27:57 +00:00
tb . flushBlocksToParts ( dst , isFinal )
2023-07-13 19:33:30 +00:00
tb . rawItemsPendingFlushesWG . Done ( )
2022-12-05 23:27:57 +00:00
return dst
2021-04-27 12:36:31 +00:00
}
2023-09-01 07:34:16 +00:00
func ( ris * rawItemsShard ) appendBlocksToFlush ( dst [ ] * inmemoryBlock , isFinal bool ) [ ] * inmemoryBlock {
2020-05-14 19:01:51 +00:00
currentTime := fasttime . UnixTimestamp ( )
2022-12-05 23:27:57 +00:00
flushSeconds := int64 ( pendingItemsFlushInterval . Seconds ( ) )
2020-05-14 19:01:51 +00:00
if flushSeconds <= 0 {
flushSeconds = 1
}
2022-10-17 15:01:26 +00:00
lastFlushTime := atomic . LoadUint64 ( & ris . lastFlushTime )
2022-12-05 23:27:57 +00:00
if ! isFinal && currentTime < lastFlushTime + uint64 ( flushSeconds ) {
2022-10-21 11:33:03 +00:00
// Fast path - nothing to flush
return dst
2019-05-22 21:16:55 +00:00
}
2022-10-21 11:33:03 +00:00
// Slow path - move ris.ibs to dst
ris . mu . Lock ( )
ibs := ris . ibs
dst = append ( dst , ibs ... )
for i := range ibs {
ibs [ i ] = nil
}
ris . ibs = ibs [ : 0 ]
atomic . StoreUint64 ( & ris . lastFlushTime , currentTime )
ris . mu . Unlock ( )
2021-06-17 10:42:32 +00:00
return dst
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) flushBlocksToParts ( ibs [ ] * inmemoryBlock , isFinal bool ) {
2021-06-17 10:42:32 +00:00
if len ( ibs ) == 0 {
return
}
var pwsLock sync . Mutex
2022-12-05 23:27:57 +00:00
pws := make ( [ ] * partWrapper , 0 , ( len ( ibs ) + defaultPartsToMerge - 1 ) / defaultPartsToMerge )
wg := getWaitGroup ( )
2021-06-17 10:42:32 +00:00
for len ( ibs ) > 0 {
2019-05-22 21:16:55 +00:00
n := defaultPartsToMerge
2021-06-17 10:42:32 +00:00
if n > len ( ibs ) {
n = len ( ibs )
2019-05-22 21:16:55 +00:00
}
2021-06-17 10:42:32 +00:00
wg . Add ( 1 )
2022-12-05 23:27:57 +00:00
flushConcurrencyCh <- struct { } { }
go func ( ibsChunk [ ] * inmemoryBlock ) {
defer func ( ) {
<- flushConcurrencyCh
wg . Done ( )
} ( )
pw := tb . createInmemoryPart ( ibsChunk )
2021-06-17 10:42:32 +00:00
if pw == nil {
return
}
pwsLock . Lock ( )
pws = append ( pws , pw )
pwsLock . Unlock ( )
} ( ibs [ : n ] )
ibs = ibs [ n : ]
2019-05-22 21:16:55 +00:00
}
2021-06-17 10:42:32 +00:00
wg . Wait ( )
2022-12-05 23:27:57 +00:00
putWaitGroup ( wg )
2024-01-24 01:27:49 +00:00
flushConcurrencyCh <- struct { } { }
pw := tb . mustMergeInmemoryParts ( pws )
<- flushConcurrencyCh
select {
case tb . inmemoryPartsLimitCh <- struct { } { } :
default :
// Too many in-memory parts. Try assist merging them before adding pw to tb.inmemoryParts.
flushConcurrencyCh <- struct { } { }
tb . assistedMergeForInmemoryParts ( )
tb . assistedMergeForFileParts ( )
<- flushConcurrencyCh
tb . inmemoryPartsLimitCh <- struct { } { }
2023-01-18 09:09:03 +00:00
}
2024-01-24 01:27:49 +00:00
tb . partsLock . Lock ( )
tb . inmemoryParts = append ( tb . inmemoryParts , pw )
tb . notifyBackgroundMergers ( )
2022-12-05 23:27:57 +00:00
tb . partsLock . Unlock ( )
if tb . flushCallback != nil {
if isFinal {
tb . flushCallback ( )
} else {
2024-01-21 11:58:27 +00:00
// Use atomic.LoadUint32 in front of atomic.CompareAndSwapUint32 in order to avoid slow inter-CPU synchronization
// at fast path when needFlushCallbackCall is already set to 1.
if atomic . LoadUint32 ( & tb . needFlushCallbackCall ) == 0 {
atomic . CompareAndSwapUint32 ( & tb . needFlushCallbackCall , 0 , 1 )
}
2019-08-29 11:39:05 +00:00
}
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
}
2023-01-18 09:09:03 +00:00
func ( tb * Table ) notifyBackgroundMergers ( ) bool {
select {
case tb . needMergeCh <- struct { } { } :
return true
default :
return false
}
}
2023-02-11 20:06:18 +00:00
var flushConcurrencyLimit = func ( ) int {
n := cgroup . AvailableCPUs ( )
if n < 2 {
// Allow at least 2 concurrent flushers on systems with a single CPU core
// in order to guarantee that in-memory data flushes and background merges can be continued
// when a single flusher is busy with the long merge.
n = 2
}
return n
} ( )
var flushConcurrencyCh = make ( chan struct { } , flushConcurrencyLimit )
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
func ( tb * Table ) assistedMergeForInmemoryParts ( ) {
2023-10-01 20:17:38 +00:00
tb . partsLock . Lock ( )
2024-01-24 01:27:49 +00:00
needMerge := getNotInMergePartsCount ( tb . inmemoryParts ) >= defaultPartsToMerge
2023-10-01 20:17:38 +00:00
tb . partsLock . Unlock ( )
if ! needMerge {
return
}
2019-05-22 21:16:55 +00:00
2024-01-24 13:07:40 +00:00
atomic . AddUint64 ( & tb . inmemoryAssistedMergesCount , 1 )
2024-01-24 01:27:49 +00:00
maxOutBytes := tb . getMaxFilePartSize ( )
tb . partsLock . Lock ( )
pws := getPartsToMerge ( tb . inmemoryParts , maxOutBytes , true )
tb . partsLock . Unlock ( )
err := tb . mergeParts ( pws , tb . stopCh , true )
2023-10-01 20:17:38 +00:00
if err == nil {
return
}
if errors . Is ( err , errNothingToMerge ) || errors . Is ( err , errForciblyStopped ) {
return
2022-12-05 23:27:57 +00:00
}
2023-10-01 20:17:38 +00:00
logger . Panicf ( "FATAL: cannot assist with merging inmemory parts: %s" , err )
2022-12-05 23:27:57 +00:00
}
2022-12-13 00:49:21 +00:00
func ( tb * Table ) assistedMergeForFileParts ( ) {
2023-10-01 20:17:38 +00:00
tb . partsLock . Lock ( )
2024-01-24 13:07:40 +00:00
needMerge := len ( tb . fileParts ) > maxFileParts && getNotInMergePartsCount ( tb . fileParts ) >= defaultPartsToMerge
2023-10-01 20:17:38 +00:00
tb . partsLock . Unlock ( )
if ! needMerge {
return
}
2022-12-13 00:49:21 +00:00
2024-01-24 13:07:40 +00:00
atomic . AddUint64 ( & tb . fileAssistedMergesCount , 1 )
2023-10-01 20:17:38 +00:00
err := tb . mergeExistingParts ( false )
if err == nil {
return
}
if errors . Is ( err , errNothingToMerge ) || errors . Is ( err , errForciblyStopped ) || errors . Is ( err , errReadOnlyMode ) {
return
2022-12-13 00:49:21 +00:00
}
2023-10-01 20:17:38 +00:00
logger . Panicf ( "FATAL: cannot assist with merging file parts: %s" , err )
2022-12-13 00:49:21 +00:00
}
2022-12-05 23:27:57 +00:00
func getNotInMergePartsCount ( pws [ ] * partWrapper ) int {
n := 0
for _ , pw := range pws {
if ! pw . isInMerge {
n ++
}
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
return n
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
func getWaitGroup ( ) * sync . WaitGroup {
v := wgPool . Get ( )
if v == nil {
return & sync . WaitGroup { }
}
return v . ( * sync . WaitGroup )
}
func putWaitGroup ( wg * sync . WaitGroup ) {
wgPool . Put ( wg )
}
2022-07-27 20:47:18 +00:00
2022-12-05 23:27:57 +00:00
var wgPool sync . Pool
2024-01-24 01:27:49 +00:00
func ( tb * Table ) mustMergeInmemoryParts ( pws [ ] * partWrapper ) * partWrapper {
if len ( pws ) == 1 {
// Nothing to merge
return pws [ 0 ]
}
bsrs := make ( [ ] * blockStreamReader , 0 , len ( pws ) )
for _ , pw := range pws {
if pw . mp == nil {
logger . Panicf ( "BUG: unexpected file part" )
}
bsr := getBlockStreamReader ( )
bsr . MustInitFromInmemoryPart ( pw . mp )
bsrs = append ( bsrs , bsr )
2022-12-04 06:45:53 +00:00
}
2024-01-24 01:27:49 +00:00
flushToDiskDeadline := getFlushToDiskDeadline ( pws )
return tb . mustMergeIntoInmemoryPart ( bsrs , flushToDiskDeadline )
}
func ( tb * Table ) createInmemoryPart ( ibs [ ] * inmemoryBlock ) * partWrapper {
2022-07-27 20:47:18 +00:00
// Prepare blockStreamReaders for source blocks.
bsrs := make ( [ ] * blockStreamReader , 0 , len ( ibs ) )
2021-06-17 10:42:32 +00:00
for _ , ib := range ibs {
2019-05-22 21:16:55 +00:00
if len ( ib . items ) == 0 {
continue
}
2022-07-27 20:47:18 +00:00
bsr := getBlockStreamReader ( )
2023-04-14 22:46:09 +00:00
bsr . MustInitFromInmemoryBlock ( ib )
2021-05-27 22:09:32 +00:00
putInmemoryBlock ( ib )
2022-07-27 20:47:18 +00:00
bsrs = append ( bsrs , bsr )
2020-02-13 12:06:51 +00:00
}
2022-07-27 20:47:18 +00:00
if len ( bsrs ) == 0 {
2020-02-13 12:06:51 +00:00
return nil
}
2024-01-24 01:27:49 +00:00
2022-12-05 23:27:57 +00:00
flushToDiskDeadline := time . Now ( ) . Add ( dataFlushInterval )
2022-07-27 20:47:18 +00:00
if len ( bsrs ) == 1 {
2020-02-13 12:06:51 +00:00
// Nothing to merge. Just return a single inmemory part.
2022-12-05 23:27:57 +00:00
bsr := bsrs [ 0 ]
2022-08-04 15:22:41 +00:00
mp := & inmemoryPart { }
2022-12-05 23:27:57 +00:00
mp . Init ( & bsr . Block )
putBlockStreamReader ( bsr )
return newPartWrapperFromInmemoryPart ( mp , flushToDiskDeadline )
2019-05-22 21:16:55 +00:00
}
2024-01-24 01:27:49 +00:00
return tb . mustMergeIntoInmemoryPart ( bsrs , flushToDiskDeadline )
}
func ( tb * Table ) mustMergeIntoInmemoryPart ( bsrs [ ] * blockStreamReader , flushToDiskDeadline time . Time ) * partWrapper {
2019-05-22 21:16:55 +00:00
// Prepare blockStreamWriter for destination part.
2024-01-24 01:27:49 +00:00
outItemsCount := uint64 ( 0 )
for _ , bsr := range bsrs {
outItemsCount += bsr . ph . itemsCount
}
2022-12-04 06:45:53 +00:00
compressLevel := getCompressLevel ( outItemsCount )
2019-05-22 21:16:55 +00:00
bsw := getBlockStreamWriter ( )
2022-03-03 14:46:35 +00:00
mpDst := & inmemoryPart { }
2023-04-14 22:46:09 +00:00
bsw . MustInitFromInmemoryPart ( mpDst , compressLevel )
2019-05-22 21:16:55 +00:00
// Merge parts.
// The merge shouldn't be interrupted by stopCh,
// since it may be final after stopCh is closed.
2022-12-05 23:27:57 +00:00
atomic . AddUint64 ( & tb . activeInmemoryMerges , 1 )
err := mergeBlockStreams ( & mpDst . ph , bsw , bsrs , tb . prepareBlock , nil , & tb . inmemoryItemsMerged )
atomic . AddUint64 ( & tb . activeInmemoryMerges , ^ uint64 ( 0 ) )
atomic . AddUint64 ( & tb . inmemoryMergesCount , 1 )
2020-07-22 21:58:48 +00:00
if err != nil {
2019-05-22 21:16:55 +00:00
logger . Panicf ( "FATAL: cannot merge inmemoryBlocks: %s" , err )
}
putBlockStreamWriter ( bsw )
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
2024-01-24 01:27:49 +00:00
2022-12-05 23:27:57 +00:00
return newPartWrapperFromInmemoryPart ( mpDst , flushToDiskDeadline )
}
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
func newPartWrapperFromInmemoryPart ( mp * inmemoryPart , flushToDiskDeadline time . Time ) * partWrapper {
p := mp . NewPart ( )
2019-05-22 21:16:55 +00:00
return & partWrapper {
2022-12-05 23:27:57 +00:00
p : p ,
mp : mp ,
refCount : 1 ,
flushToDiskDeadline : flushToDiskDeadline ,
2019-05-22 21:16:55 +00:00
}
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) startMergeWorkers ( ) {
2022-12-13 00:49:21 +00:00
// The actual number of concurrent merges is limited inside mergeWorker() below.
2023-04-14 06:36:06 +00:00
for i := 0 ; i < cap ( mergeWorkersLimitCh ) ; i ++ {
2022-12-04 07:03:05 +00:00
tb . wg . Add ( 1 )
2019-05-22 21:16:55 +00:00
go func ( ) {
2022-12-05 23:27:57 +00:00
tb . mergeWorker ( )
2022-12-04 07:03:05 +00:00
tb . wg . Done ( )
2019-05-22 21:16:55 +00:00
} ( )
}
}
2022-12-05 23:27:57 +00:00
func getMaxInmemoryPartSize ( ) uint64 {
// Allow up to 5% of memory for in-memory parts.
n := uint64 ( 0.05 * float64 ( memory . Allowed ( ) ) / maxInmemoryParts )
if n < 1e6 {
n = 1e6
}
return n
}
func ( tb * Table ) getMaxFilePartSize ( ) uint64 {
n := fs . MustGetFreeSpace ( tb . path )
// Divide free space by the max number of concurrent merges.
maxOutBytes := n / uint64 ( cap ( mergeWorkersLimitCh ) )
if maxOutBytes > maxPartSize {
maxOutBytes = maxPartSize
}
return maxOutBytes
}
2022-06-01 11:21:12 +00:00
func ( tb * Table ) canBackgroundMerge ( ) bool {
return atomic . LoadUint32 ( tb . isReadOnly ) == 0
}
2022-09-26 13:39:56 +00:00
var errReadOnlyMode = fmt . Errorf ( "storage is in readonly mode" )
2019-09-19 18:48:14 +00:00
func ( tb * Table ) mergeExistingParts ( isFinal bool ) error {
2022-06-01 11:21:12 +00:00
if ! tb . canBackgroundMerge ( ) {
// Do not perform background merge in read-only mode
// in order to prevent from disk space shortage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2603
2022-09-26 13:39:56 +00:00
return errReadOnlyMode
2022-06-01 11:21:12 +00:00
}
2022-12-05 23:27:57 +00:00
maxOutBytes := tb . getMaxFilePartSize ( )
2019-05-22 21:16:55 +00:00
tb . partsLock . Lock ( )
2022-12-05 23:27:57 +00:00
dst := make ( [ ] * partWrapper , 0 , len ( tb . inmemoryParts ) + len ( tb . fileParts ) )
dst = append ( dst , tb . inmemoryParts ... )
dst = append ( dst , tb . fileParts ... )
pws := getPartsToMerge ( dst , maxOutBytes , isFinal )
2019-05-22 21:16:55 +00:00
tb . partsLock . Unlock ( )
2022-12-05 23:27:57 +00:00
return tb . mergeParts ( pws , tb . stopCh , isFinal )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
func ( tb * Table ) mergeWorker ( ) {
2020-05-14 19:01:51 +00:00
var lastMergeTime uint64
2019-05-22 21:16:55 +00:00
isFinal := false
for {
2022-12-13 00:49:21 +00:00
// Limit the number of concurrent calls to mergeExistingParts, since the total number of merge workers
// across tables may exceed the the cap(mergeWorkersLimitCh).
2022-12-05 23:27:57 +00:00
mergeWorkersLimitCh <- struct { } { }
2019-09-19 18:48:14 +00:00
err := tb . mergeExistingParts ( isFinal )
2022-12-05 23:27:57 +00:00
<- mergeWorkersLimitCh
2019-05-22 21:16:55 +00:00
if err == nil {
// Try merging additional parts.
2020-05-14 19:01:51 +00:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 21:16:55 +00:00
isFinal = false
continue
}
2020-09-17 00:02:35 +00:00
if errors . Is ( err , errForciblyStopped ) {
2019-05-22 21:16:55 +00:00
// The merger has been stopped.
2022-12-05 23:27:57 +00:00
return
2019-05-22 21:16:55 +00:00
}
2022-09-26 13:39:56 +00:00
if ! errors . Is ( err , errNothingToMerge ) && ! errors . Is ( err , errReadOnlyMode ) {
2022-12-05 23:27:57 +00:00
// Unexpected error.
logger . Panicf ( "FATAL: unrecoverable error when merging inmemory parts in %q: %s" , tb . path , err )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
if finalMergeDelaySeconds > 0 && fasttime . UnixTimestamp ( ) - lastMergeTime > finalMergeDelaySeconds {
2019-05-22 21:16:55 +00:00
// We have free time for merging into bigger parts.
// This should improve select performance.
2020-05-14 19:01:51 +00:00
lastMergeTime = fasttime . UnixTimestamp ( )
2019-05-22 21:16:55 +00:00
isFinal = true
continue
}
2023-01-18 09:09:03 +00:00
// Nothing to merge. Wait for the notification of new merge.
2019-05-22 21:16:55 +00:00
select {
case <- tb . stopCh :
2022-12-05 23:27:57 +00:00
return
2023-01-18 09:09:03 +00:00
case <- tb . needMergeCh :
2019-05-22 21:16:55 +00:00
}
}
}
2022-12-05 23:27:57 +00:00
// Disable final merge by default, since it may lead to high disk IO and CPU usage
// after some inactivity time.
var finalMergeDelaySeconds = uint64 ( 0 )
// SetFinalMergeDelay sets the delay before doing final merge for Table without newly ingested data.
//
// This function may be called only before Table initialization.
func SetFinalMergeDelay ( delay time . Duration ) {
if delay <= 0 {
return
}
finalMergeDelaySeconds = uint64 ( delay . Seconds ( ) + 1 )
}
2019-05-22 21:16:55 +00:00
var errNothingToMerge = fmt . Errorf ( "nothing to merge" )
2023-10-02 06:04:59 +00:00
func assertIsInMerge ( pws [ ] * partWrapper ) {
for _ , pw := range pws {
if ! pw . isInMerge {
logger . Panicf ( "BUG: partWrapper.isInMerge unexpectedly set to false" )
}
}
}
2022-12-01 03:53:02 +00:00
func ( tb * Table ) releasePartsToMerge ( pws [ ] * partWrapper ) {
tb . partsLock . Lock ( )
for _ , pw := range pws {
if ! pw . isInMerge {
logger . Panicf ( "BUG: missing isInMerge flag on the part %q" , pw . p . path )
}
pw . isInMerge = false
}
tb . partsLock . Unlock ( )
}
2022-12-05 23:27:57 +00:00
// mergeParts merges pws to a single resulting part.
2020-09-16 23:05:54 +00:00
//
// Merging is immediately stopped if stopCh is closed.
//
2022-12-05 23:27:57 +00:00
// If isFinal is set, then the resulting part will be stored to disk.
//
2020-09-16 23:05:54 +00:00
// All the parts inside pws must have isInMerge field set to true.
2023-10-02 06:04:59 +00:00
// The isInMerge field inside pws parts is set to false before returning from the function.
2022-12-05 23:27:57 +00:00
func ( tb * Table ) mergeParts ( pws [ ] * partWrapper , stopCh <- chan struct { } , isFinal bool ) error {
2019-05-22 21:16:55 +00:00
if len ( pws ) == 0 {
// Nothing to merge.
return errNothingToMerge
}
2023-10-02 06:04:59 +00:00
assertIsInMerge ( pws )
defer tb . releasePartsToMerge ( pws )
2019-05-22 21:16:55 +00:00
startTime := time . Now ( )
2022-12-05 23:27:57 +00:00
// Initialize destination paths.
dstPartType := getDstPartType ( pws , isFinal )
2023-03-19 08:36:05 +00:00
mergeIdx := tb . nextMergeIdx ( )
dstPartPath := ""
if dstPartType == partFile {
2023-03-25 20:39:38 +00:00
dstPartPath = filepath . Join ( tb . path , fmt . Sprintf ( "%016X" , mergeIdx ) )
2023-03-19 08:36:05 +00:00
}
2022-12-05 23:27:57 +00:00
if isFinal && len ( pws ) == 1 && pws [ 0 ] . mp != nil {
// Fast path: flush a single in-memory part to disk.
mp := pws [ 0 ] . mp
2023-04-14 05:11:56 +00:00
mp . MustStoreToDisk ( dstPartPath )
2023-03-19 08:36:05 +00:00
pwNew := tb . openCreatedPart ( pws , nil , dstPartPath )
2022-12-05 23:27:57 +00:00
tb . swapSrcWithDstParts ( pws , pwNew , dstPartType )
return nil
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
// Prepare BlockStreamReaders for source parts.
2023-04-14 22:46:09 +00:00
bsrs := mustOpenBlockStreamReaders ( pws )
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
// Prepare BlockStreamWriter for destination part.
srcSize := uint64 ( 0 )
srcItemsCount := uint64 ( 0 )
srcBlocksCount := uint64 ( 0 )
for _ , pw := range pws {
srcSize += pw . p . size
srcItemsCount += pw . p . ph . itemsCount
srcBlocksCount += pw . p . ph . blocksCount
}
compressLevel := getCompressLevel ( srcItemsCount )
2019-05-22 21:16:55 +00:00
bsw := getBlockStreamWriter ( )
2022-12-05 23:27:57 +00:00
var mpNew * inmemoryPart
if dstPartType == partInmemory {
mpNew = & inmemoryPart { }
2023-04-14 22:46:09 +00:00
bsw . MustInitFromInmemoryPart ( mpNew , compressLevel )
2022-12-05 23:27:57 +00:00
} else {
nocache := srcItemsCount > maxItemsPerCachedPart ( )
2023-04-14 22:12:45 +00:00
bsw . MustInitFromFilePart ( dstPartPath , nocache , compressLevel )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
// Merge source parts to destination part.
2023-03-19 08:36:05 +00:00
ph , err := tb . mergePartsInternal ( dstPartPath , bsw , bsrs , dstPartType , stopCh )
2019-05-22 21:16:55 +00:00
putBlockStreamWriter ( bsw )
2023-04-14 22:46:09 +00:00
for _ , bsr := range bsrs {
putBlockStreamReader ( bsr )
}
2019-05-22 21:16:55 +00:00
if err != nil {
2023-03-19 08:36:05 +00:00
return err
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
if mpNew != nil {
// Update partHeader for destination inmemory part after the merge.
mpNew . ph = * ph
2023-04-14 04:03:06 +00:00
} else {
// Make sure the created part directory listing is synced.
fs . MustSyncPath ( dstPartPath )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
// Atomically swap the source parts with the newly created part.
pwNew := tb . openCreatedPart ( pws , mpNew , dstPartPath )
pDst := pwNew . p
dstItemsCount := pDst . ph . itemsCount
dstBlocksCount := pDst . ph . blocksCount
dstSize := pDst . size
2023-03-06 10:11:08 +00:00
tb . swapSrcWithDstParts ( pws , pwNew , dstPartType )
d := time . Since ( startTime )
if d <= 30 * time . Second {
return nil
}
// Log stats for long merges.
2022-12-05 23:27:57 +00:00
durationSecs := d . Seconds ( )
itemsPerSec := int ( float64 ( srcItemsCount ) / durationSecs )
logger . Infof ( "merged (%d parts, %d items, %d blocks, %d bytes) into (1 part, %d items, %d blocks, %d bytes) in %.3f seconds at %d items/sec to %q" ,
len ( pws ) , srcItemsCount , srcBlocksCount , srcSize , dstItemsCount , dstBlocksCount , dstSize , durationSecs , itemsPerSec , dstPartPath )
return nil
}
func getFlushToDiskDeadline ( pws [ ] * partWrapper ) time . Time {
2023-04-14 06:17:10 +00:00
d := time . Now ( ) . Add ( dataFlushInterval )
for _ , pw := range pws {
if pw . mp != nil && pw . flushToDiskDeadline . Before ( d ) {
2022-12-05 23:27:57 +00:00
d = pw . flushToDiskDeadline
2019-05-22 21:16:55 +00:00
}
}
2022-12-05 23:27:57 +00:00
return d
}
type partType int
var (
partInmemory = partType ( 0 )
partFile = partType ( 1 )
)
func getDstPartType ( pws [ ] * partWrapper , isFinal bool ) partType {
dstPartSize := getPartsSize ( pws )
if isFinal || dstPartSize > getMaxInmemoryPartSize ( ) {
return partFile
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
if ! areAllInmemoryParts ( pws ) {
// If at least a single source part is located in file,
// then the destination part must be in file for durability reasons.
return partFile
}
return partInmemory
}
2019-05-22 21:16:55 +00:00
2023-04-14 22:46:09 +00:00
func mustOpenBlockStreamReaders ( pws [ ] * partWrapper ) [ ] * blockStreamReader {
2022-12-05 23:27:57 +00:00
bsrs := make ( [ ] * blockStreamReader , 0 , len ( pws ) )
for _ , pw := range pws {
bsr := getBlockStreamReader ( )
if pw . mp != nil {
2023-04-14 22:46:09 +00:00
bsr . MustInitFromInmemoryPart ( pw . mp )
2022-12-05 23:27:57 +00:00
} else {
2023-04-14 22:46:09 +00:00
bsr . MustInitFromFilePart ( pw . p . path )
2022-12-05 23:27:57 +00:00
}
bsrs = append ( bsrs , bsr )
}
2023-04-14 22:46:09 +00:00
return bsrs
2022-12-05 23:27:57 +00:00
}
2023-03-19 08:36:05 +00:00
func ( tb * Table ) mergePartsInternal ( dstPartPath string , bsw * blockStreamWriter , bsrs [ ] * blockStreamReader , dstPartType partType , stopCh <- chan struct { } ) ( * partHeader , error ) {
2022-12-05 23:27:57 +00:00
var ph partHeader
var itemsMerged * uint64
var mergesCount * uint64
var activeMerges * uint64
switch dstPartType {
case partInmemory :
itemsMerged = & tb . inmemoryItemsMerged
mergesCount = & tb . inmemoryMergesCount
activeMerges = & tb . activeInmemoryMerges
case partFile :
itemsMerged = & tb . fileItemsMerged
mergesCount = & tb . fileMergesCount
activeMerges = & tb . activeFileMerges
default :
logger . Panicf ( "BUG: unknown partType=%d" , dstPartType )
}
atomic . AddUint64 ( activeMerges , 1 )
err := mergeBlockStreams ( & ph , bsw , bsrs , tb . prepareBlock , stopCh , itemsMerged )
atomic . AddUint64 ( activeMerges , ^ uint64 ( 0 ) )
atomic . AddUint64 ( mergesCount , 1 )
if err != nil {
2023-03-19 08:36:05 +00:00
return nil , fmt . Errorf ( "cannot merge %d parts to %s: %w" , len ( bsrs ) , dstPartPath , err )
2022-12-05 23:27:57 +00:00
}
2023-03-19 08:36:05 +00:00
if dstPartPath != "" {
2023-04-14 04:33:15 +00:00
ph . MustWriteMetadata ( dstPartPath )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
return & ph , nil
}
2023-03-19 08:36:05 +00:00
func ( tb * Table ) openCreatedPart ( pws [ ] * partWrapper , mpNew * inmemoryPart , dstPartPath string ) * partWrapper {
2022-12-05 23:27:57 +00:00
// Open the created part.
if mpNew != nil {
// Open the created part from memory.
flushToDiskDeadline := getFlushToDiskDeadline ( pws )
pwNew := newPartWrapperFromInmemoryPart ( mpNew , flushToDiskDeadline )
2023-03-19 08:36:05 +00:00
return pwNew
2022-12-05 23:27:57 +00:00
}
// Open the created part from disk.
2023-04-14 22:46:09 +00:00
pNew := mustOpenFilePart ( dstPartPath )
2022-12-05 23:27:57 +00:00
pwNew := & partWrapper {
p : pNew ,
2019-05-22 21:16:55 +00:00
refCount : 1 ,
}
2023-03-19 08:36:05 +00:00
return pwNew
2022-12-05 23:27:57 +00:00
}
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
func areAllInmemoryParts ( pws [ ] * partWrapper ) bool {
for _ , pw := range pws {
if pw . mp == nil {
return false
}
}
return true
}
func ( tb * Table ) swapSrcWithDstParts ( pws [ ] * partWrapper , pwNew * partWrapper , dstPartType partType ) {
// Atomically unregister old parts and add new part to tb.
2019-05-22 21:16:55 +00:00
m := make ( map [ * partWrapper ] bool , len ( pws ) )
for _ , pw := range pws {
m [ pw ] = true
}
if len ( m ) != len ( pws ) {
2022-12-05 23:27:57 +00:00
logger . Panicf ( "BUG: %d duplicate parts found when merging %d parts" , len ( pws ) - len ( m ) , len ( pws ) )
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
removedInmemoryParts := 0
removedFileParts := 0
2019-05-22 21:16:55 +00:00
tb . partsLock . Lock ( )
2023-03-19 08:36:05 +00:00
2022-12-05 23:27:57 +00:00
tb . inmemoryParts , removedInmemoryParts = removeParts ( tb . inmemoryParts , m )
tb . fileParts , removedFileParts = removeParts ( tb . fileParts , m )
2023-03-19 08:36:05 +00:00
switch dstPartType {
case partInmemory :
tb . inmemoryParts = append ( tb . inmemoryParts , pwNew )
case partFile :
tb . fileParts = append ( tb . fileParts , pwNew )
default :
logger . Panicf ( "BUG: unknown partType=%d" , dstPartType )
}
tb . notifyBackgroundMergers ( )
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedFileParts > 0 || dstPartType == partFile {
mustWritePartNames ( tb . fileParts , tb . path )
2022-12-05 23:27:57 +00:00
}
2023-03-19 08:36:05 +00:00
2019-05-22 21:16:55 +00:00
tb . partsLock . Unlock ( )
2022-12-05 23:27:57 +00:00
2024-01-24 01:27:49 +00:00
// Update inmemoryPartsLimitCh accordingly to the number of the remaining in-memory parts.
for i := 0 ; i < removedInmemoryParts ; i ++ {
<- tb . inmemoryPartsLimitCh
}
if dstPartType == partInmemory {
tb . inmemoryPartsLimitCh <- struct { } { }
}
2022-12-05 23:27:57 +00:00
removedParts := removedInmemoryParts + removedFileParts
2019-05-22 21:16:55 +00:00
if removedParts != len ( m ) {
2022-12-05 23:27:57 +00:00
logger . Panicf ( "BUG: unexpected number of parts removed; got %d, want %d" , removedParts , len ( m ) )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
// Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted.
2019-05-22 21:16:55 +00:00
for _ , pw := range pws {
2023-03-19 08:36:05 +00:00
atomic . StoreUint32 ( & pw . mustBeDeleted , 1 )
2019-05-22 21:16:55 +00:00
pw . decRef ( )
}
2022-12-05 23:27:57 +00:00
}
2019-05-22 21:16:55 +00:00
2022-12-05 23:27:57 +00:00
func getPartsSize ( pws [ ] * partWrapper ) uint64 {
n := uint64 ( 0 )
for _ , pw := range pws {
n += pw . p . size
2019-05-22 21:16:55 +00:00
}
2022-12-05 23:27:57 +00:00
return n
2019-05-22 21:16:55 +00:00
}
2022-12-04 06:45:53 +00:00
func getCompressLevel ( itemsCount uint64 ) int {
2020-05-14 20:44:01 +00:00
if itemsCount <= 1 << 16 {
// -5 is the minimum supported compression for zstd.
// See https://github.com/facebook/zstd/releases/tag/v1.3.4
return - 5
}
if itemsCount <= 1 << 17 {
return - 4
}
if itemsCount <= 1 << 18 {
return - 3
}
if itemsCount <= 1 << 19 {
return - 2
}
if itemsCount <= 1 << 20 {
return - 1
}
if itemsCount <= 1 << 22 {
2019-05-22 21:16:55 +00:00
return 1
}
2020-05-14 20:44:01 +00:00
if itemsCount <= 1 << 25 {
2019-05-22 21:16:55 +00:00
return 2
}
2024-01-23 15:44:05 +00:00
return 3
2019-05-22 21:16:55 +00:00
}
func ( tb * Table ) nextMergeIdx ( ) uint64 {
return atomic . AddUint64 ( & tb . mergeIdx , 1 )
}
2023-04-14 06:36:06 +00:00
var mergeWorkersLimitCh = make ( chan struct { } , getWorkersCount ( ) )
func getWorkersCount ( ) int {
n := cgroup . AvailableCPUs ( )
if n < 4 {
// Allow at least 4 merge workers on systems with small CPUs count
// in order to guarantee that background merges can be continued
// when multiple workers are busy with big merges.
n = 4
}
return n
}
2019-05-22 21:16:55 +00:00
2023-04-15 05:08:43 +00:00
func mustOpenParts ( path string ) [ ] * partWrapper {
2019-11-02 00:26:02 +00:00
// The path can be missing after restoring from backup, so create it if needed.
2023-04-14 05:11:56 +00:00
fs . MustMkdirIfNotExist ( path )
2022-09-13 12:56:05 +00:00
fs . MustRemoveTemporaryDirs ( path )
2019-05-22 21:16:55 +00:00
2023-03-19 08:36:05 +00:00
// Remove txn and tmp directories, which may be left after the upgrade
// to v1.90.0 and newer versions.
2023-03-25 20:39:38 +00:00
fs . MustRemoveAll ( filepath . Join ( path , "txn" ) )
fs . MustRemoveAll ( filepath . Join ( path , "tmp" ) )
2019-05-22 21:16:55 +00:00
2023-03-19 08:36:05 +00:00
partNames := mustReadPartNames ( path )
2019-05-22 21:16:55 +00:00
2023-03-19 08:36:05 +00:00
// Remove dirs missing in partNames. These dirs may be left after unclean shutdown
// or after the update from versions prior to v1.90.0.
2023-04-15 05:08:43 +00:00
des := fs . MustReadDir ( path )
2023-03-19 08:36:05 +00:00
m := make ( map [ string ] struct { } , len ( partNames ) )
for _ , partName := range partNames {
2023-09-19 09:17:41 +00:00
// Make sure the partName exists on disk.
// If it is missing, then manual action from the user is needed,
// since this is unexpected state, which cannot occur under normal operation,
// including unclean shutdown.
partPath := filepath . Join ( path , partName )
if ! fs . IsPathExist ( partPath ) {
partsFile := filepath . Join ( path , partsFilename )
logger . Panicf ( "FATAL: part %q is listed in %q, but is missing on disk; " +
"ensure %q contents is not corrupted; remove %q to rebuild its' content from the list of existing parts" ,
partPath , partsFile , partsFile , partsFile )
}
2023-03-19 08:36:05 +00:00
m [ partName ] = struct { } { }
2019-05-22 21:16:55 +00:00
}
2023-03-18 04:03:34 +00:00
for _ , de := range des {
if ! fs . IsDirOrSymlink ( de ) {
2019-05-22 21:16:55 +00:00
// Skip non-directories.
continue
}
2023-03-18 04:03:34 +00:00
fn := de . Name ( )
2023-03-19 08:36:05 +00:00
if _ , ok := m [ fn ] ; ! ok {
2023-03-25 20:39:38 +00:00
deletePath := filepath . Join ( path , fn )
2023-03-19 08:36:05 +00:00
fs . MustRemoveAll ( deletePath )
2021-04-22 09:58:53 +00:00
}
2023-03-19 08:36:05 +00:00
}
fs . MustSyncPath ( path )
// Open parts
var pws [ ] * partWrapper
for _ , partName := range partNames {
2023-03-25 20:39:38 +00:00
partPath := filepath . Join ( path , partName )
2023-04-14 22:46:09 +00:00
p := mustOpenFilePart ( partPath )
2019-05-22 21:16:55 +00:00
pw := & partWrapper {
p : p ,
refCount : 1 ,
}
pws = append ( pws , pw )
}
2023-06-15 09:19:22 +00:00
partNamesPath := filepath . Join ( path , partsFilename )
if ! fs . IsPathExist ( partNamesPath ) {
2023-07-07 00:05:59 +00:00
// Create parts.json file if it doesn't exist yet.
// This should protect from possible carshloops just after the migration from versions below v1.90.0
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4336
2023-06-15 09:19:22 +00:00
mustWritePartNames ( pws , path )
}
2019-05-22 21:16:55 +00:00
2023-04-15 05:08:43 +00:00
return pws
2019-05-22 21:16:55 +00:00
}
// CreateSnapshotAt creates tb snapshot in the given dstDir.
//
2023-03-19 08:36:05 +00:00
// Snapshot is created using linux hard links, so it is usually created very quickly.
2023-02-27 20:12:03 +00:00
//
// If deadline is reached before snapshot is created error is returned.
2023-02-27 20:57:22 +00:00
//
// The caller is responsible for data removal at dstDir on unsuccessful snapshot creation.
2023-02-27 20:12:03 +00:00
func ( tb * Table ) CreateSnapshotAt ( dstDir string , deadline uint64 ) error {
2019-05-22 21:16:55 +00:00
logger . Infof ( "creating Table snapshot of %q..." , tb . path )
startTime := time . Now ( )
var err error
srcDir := tb . path
srcDir , err = filepath . Abs ( srcDir )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , srcDir , err )
2019-05-22 21:16:55 +00:00
}
dstDir , err = filepath . Abs ( dstDir )
if err != nil {
2020-06-30 19:58:18 +00:00
return fmt . Errorf ( "cannot obtain absolute dir for %q: %w" , dstDir , err )
2019-05-22 21:16:55 +00:00
}
2023-03-25 20:39:38 +00:00
if strings . HasPrefix ( dstDir , srcDir + string ( filepath . Separator ) ) {
2019-05-22 21:16:55 +00:00
return fmt . Errorf ( "cannot create snapshot %q inside the data dir %q" , dstDir , srcDir )
}
// Flush inmemory items to disk.
2022-12-05 23:27:57 +00:00
tb . flushInmemoryItems ( )
2019-05-22 21:16:55 +00:00
2023-04-14 05:11:56 +00:00
fs . MustMkdirFailIfExist ( dstDir )
2019-05-22 21:16:55 +00:00
2023-03-19 08:36:05 +00:00
pws := tb . getParts ( nil )
defer tb . putParts ( pws )
2023-02-27 20:12:03 +00:00
2023-03-19 08:36:05 +00:00
// Create a file with part names at dstDir
mustWritePartNames ( pws , dstDir )
2023-02-27 20:12:03 +00:00
2023-03-19 08:36:05 +00:00
// Make hardlinks for pws at dstDir
for _ , pw := range pws {
if pw . mp != nil {
// Skip in-memory parts
2019-05-22 21:16:55 +00:00
continue
}
2023-03-19 08:36:05 +00:00
if deadline > 0 && fasttime . UnixTimestamp ( ) > deadline {
return fmt . Errorf ( "cannot create snapshot for %q: timeout exceeded" , tb . path )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
srcPartPath := pw . p . path
2023-03-25 20:39:38 +00:00
dstPartPath := filepath . Join ( dstDir , filepath . Base ( srcPartPath ) )
2023-04-14 05:48:05 +00:00
fs . MustHardLinkFiles ( srcPartPath , dstPartPath )
2019-05-22 21:16:55 +00:00
}
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( dstDir )
2019-05-22 21:16:55 +00:00
parentDir := filepath . Dir ( dstDir )
2019-06-11 20:13:04 +00:00
fs . MustSyncPath ( parentDir )
2019-05-22 21:16:55 +00:00
2020-01-22 16:27:44 +00:00
logger . Infof ( "created Table snapshot of %q at %q in %.3f seconds" , srcDir , dstDir , time . Since ( startTime ) . Seconds ( ) )
2019-05-22 21:16:55 +00:00
return nil
}
2023-03-19 08:36:05 +00:00
func mustWritePartNames ( pws [ ] * partWrapper , dstDir string ) {
partNames := make ( [ ] string , 0 , len ( pws ) )
for _ , pw := range pws {
if pw . mp != nil {
// Skip in-memory parts
2019-08-12 11:44:24 +00:00
continue
}
2023-03-19 08:36:05 +00:00
partName := filepath . Base ( pw . p . path )
partNames = append ( partNames , partName )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
sort . Strings ( partNames )
data , err := json . Marshal ( partNames )
2019-05-22 21:16:55 +00:00
if err != nil {
2023-03-19 08:36:05 +00:00
logger . Panicf ( "BUG: cannot marshal partNames to JSON: %s" , err )
2019-05-22 21:16:55 +00:00
}
2023-03-25 20:39:38 +00:00
partNamesPath := filepath . Join ( dstDir , partsFilename )
2023-04-14 05:41:12 +00:00
fs . MustWriteAtomic ( partNamesPath , data , true )
2023-03-19 08:36:05 +00:00
}
2019-05-22 21:16:55 +00:00
2023-03-19 08:36:05 +00:00
func mustReadPartNames ( srcDir string ) [ ] string {
2023-03-25 20:39:38 +00:00
partNamesPath := filepath . Join ( srcDir , partsFilename )
2023-04-15 06:16:26 +00:00
if fs . IsPathExist ( partNamesPath ) {
data , err := os . ReadFile ( partNamesPath )
if err != nil {
logger . Panicf ( "FATAL: cannot read %s file: %s" , partsFilename , err )
}
2023-03-19 08:36:05 +00:00
var partNames [ ] string
if err := json . Unmarshal ( data , & partNames ) ; err != nil {
logger . Panicf ( "FATAL: cannot parse %s: %s" , partNamesPath , err )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
return partNames
2019-05-22 21:16:55 +00:00
}
2023-03-25 21:33:54 +00:00
// The partsFilename is missing. This is the upgrade from versions previous to v1.90.0.
2023-03-19 08:36:05 +00:00
// Read part names from directories under srcDir
2023-04-15 05:08:43 +00:00
des := fs . MustReadDir ( srcDir )
2023-03-19 08:36:05 +00:00
var partNames [ ] string
for _ , de := range des {
if ! fs . IsDirOrSymlink ( de ) {
// Skip non-directories.
continue
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
partName := de . Name ( )
if isSpecialDir ( partName ) {
// Skip special dirs.
continue
2019-12-02 19:34:35 +00:00
}
2023-03-19 08:36:05 +00:00
partNames = append ( partNames , partName )
2019-05-22 21:16:55 +00:00
}
2023-03-19 08:36:05 +00:00
return partNames
2019-05-22 21:16:55 +00:00
}
// getPartsToMerge returns optimal parts to merge from pws.
//
// if isFinal is set, then merge harder.
//
2021-08-25 06:35:03 +00:00
// The summary size of the returned parts must be smaller than the maxOutBytes.
func getPartsToMerge ( pws [ ] * partWrapper , maxOutBytes uint64 , isFinal bool ) [ ] * partWrapper {
2019-05-22 21:16:55 +00:00
pwsRemaining := make ( [ ] * partWrapper , 0 , len ( pws ) )
for _ , pw := range pws {
if ! pw . isInMerge {
pwsRemaining = append ( pwsRemaining , pw )
}
}
maxPartsToMerge := defaultPartsToMerge
var dst [ ] * partWrapper
if isFinal {
for len ( dst ) == 0 && maxPartsToMerge >= finalPartsToMerge {
2021-08-25 06:35:03 +00:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 21:16:55 +00:00
maxPartsToMerge --
}
} else {
2021-08-25 06:35:03 +00:00
dst = appendPartsToMerge ( dst [ : 0 ] , pwsRemaining , maxPartsToMerge , maxOutBytes )
2019-05-22 21:16:55 +00:00
}
for _ , pw := range dst {
if pw . isInMerge {
logger . Panicf ( "BUG: partWrapper.isInMerge is already set" )
}
pw . isInMerge = true
}
return dst
}
2021-08-25 06:35:03 +00:00
// minMergeMultiplier is the minimum multiplier for the size of the output part
// compared to the size of the maximum input part for the merge.
//
// Higher value reduces write amplification (disk write IO induced by the merge),
// while increases the number of unmerged parts.
// The 1.7 is good enough for production workloads.
const minMergeMultiplier = 1.7
2019-05-22 21:16:55 +00:00
// appendPartsToMerge finds optimal parts to merge from src, appends
// them to dst and returns the result.
2021-08-25 06:35:03 +00:00
func appendPartsToMerge ( dst , src [ ] * partWrapper , maxPartsToMerge int , maxOutBytes uint64 ) [ ] * partWrapper {
2019-05-22 21:16:55 +00:00
if len ( src ) < 2 {
// There is no need in merging zero or one part :)
return dst
}
if maxPartsToMerge < 2 {
logger . Panicf ( "BUG: maxPartsToMerge cannot be smaller than 2; got %d" , maxPartsToMerge )
}
// Filter out too big parts.
// This should reduce N for O(n^2) algorithm below.
2021-08-25 06:35:03 +00:00
maxInPartBytes := uint64 ( float64 ( maxOutBytes ) / minMergeMultiplier )
2019-05-22 21:16:55 +00:00
tmp := make ( [ ] * partWrapper , 0 , len ( src ) )
for _ , pw := range src {
2021-08-25 06:35:03 +00:00
if pw . p . size > maxInPartBytes {
2019-05-22 21:16:55 +00:00
continue
}
tmp = append ( tmp , pw )
}
src = tmp
2022-12-05 23:27:57 +00:00
sortPartsForOptimalMerge ( src )
2019-05-22 21:16:55 +00:00
2020-12-18 18:00:06 +00:00
maxSrcParts := maxPartsToMerge
2021-07-02 14:24:14 +00:00
if maxSrcParts > len ( src ) {
2020-12-18 18:00:06 +00:00
maxSrcParts = len ( src )
2019-05-22 21:16:55 +00:00
}
2021-07-02 14:24:14 +00:00
minSrcParts := ( maxSrcParts + 1 ) / 2
if minSrcParts < 2 {
minSrcParts = 2
}
2019-05-22 21:16:55 +00:00
2020-12-18 18:00:06 +00:00
// Exhaustive search for parts giving the lowest write amplification when merged.
2019-05-22 21:16:55 +00:00
var pws [ ] * partWrapper
maxM := float64 ( 0 )
2020-12-18 18:00:06 +00:00
for i := minSrcParts ; i <= maxSrcParts ; i ++ {
2019-05-22 21:16:55 +00:00
for j := 0 ; j <= len ( src ) - i ; j ++ {
2019-10-29 10:45:19 +00:00
a := src [ j : j + i ]
2021-08-25 06:35:03 +00:00
if a [ 0 ] . p . size * uint64 ( len ( a ) ) < a [ len ( a ) - 1 ] . p . size {
// Do not merge parts with too big difference in size,
2020-12-18 18:00:06 +00:00
// since this results in unbalanced merges.
continue
}
2021-08-25 06:35:03 +00:00
outBytes := uint64 ( 0 )
2019-10-29 10:45:19 +00:00
for _ , pw := range a {
2021-08-25 06:35:03 +00:00
outBytes += pw . p . size
2019-05-22 21:16:55 +00:00
}
2021-08-25 06:35:03 +00:00
if outBytes > maxOutBytes {
2019-10-29 10:45:19 +00:00
// There is no sense in checking the remaining bigger parts.
break
2019-05-22 21:16:55 +00:00
}
2021-08-25 06:35:03 +00:00
m := float64 ( outBytes ) / float64 ( a [ len ( a ) - 1 ] . p . size )
2019-05-22 21:16:55 +00:00
if m < maxM {
continue
}
maxM = m
2019-10-29 10:45:19 +00:00
pws = a
2019-05-22 21:16:55 +00:00
}
}
2019-10-29 10:45:19 +00:00
minM := float64 ( maxPartsToMerge ) / 2
2021-08-25 06:35:03 +00:00
if minM < minMergeMultiplier {
minM = minMergeMultiplier
2019-05-22 21:16:55 +00:00
}
if maxM < minM {
2021-08-25 06:35:03 +00:00
// There is no sense in merging parts with too small m,
// since this leads to high disk write IO.
2019-05-22 21:16:55 +00:00
return dst
}
return append ( dst , pws ... )
}
2022-12-05 23:27:57 +00:00
func sortPartsForOptimalMerge ( pws [ ] * partWrapper ) {
// Sort src parts by size.
sort . Slice ( pws , func ( i , j int ) bool {
return pws [ i ] . p . size < pws [ j ] . p . size
} )
}
2019-05-22 21:16:55 +00:00
func removeParts ( pws [ ] * partWrapper , partsToRemove map [ * partWrapper ] bool ) ( [ ] * partWrapper , int ) {
dst := pws [ : 0 ]
for _ , pw := range pws {
2020-09-16 23:05:54 +00:00
if ! partsToRemove [ pw ] {
dst = append ( dst , pw )
2019-05-22 21:16:55 +00:00
}
}
2022-12-05 23:27:57 +00:00
for i := len ( dst ) ; i < len ( pws ) ; i ++ {
pws [ i ] = nil
}
return dst , len ( pws ) - len ( dst )
2019-05-22 21:16:55 +00:00
}
func isSpecialDir ( name string ) bool {
// Snapshots and cache dirs aren't used anymore.
// Keep them here for backwards compatibility.
2023-02-24 20:38:42 +00:00
return name == "tmp" || name == "txn" || name == "snapshots" || name == "cache" || fs . IsScheduledForRemoval ( name )
2019-05-22 21:16:55 +00:00
}