This commit is contained in:
Aliaksandr Valialkin 2024-05-10 04:52:38 +02:00
parent edede71be8
commit 57afedbfe8
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 493 additions and 410 deletions

View file

@ -4,8 +4,8 @@ import (
"context" "context"
"flag" "flag"
"fmt" "fmt"
"io"
"net/http" "net/http"
"sync"
"time" "time"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
@ -62,10 +62,16 @@ func Init() {
var ss logstorage.StorageStats var ss logstorage.StorageStats
strg.UpdateStats(&ss) strg.UpdateStats(&ss)
logger.Infof("successfully opened storage in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d", logger.Infof("successfully opened storage in %.3f seconds; smallParts: %d; bigParts: %d; smallPartBlocks: %d; bigPartBlocks: %d; smallPartRows: %d; bigPartRows: %d; "+
time.Since(startTime).Seconds(), ss.FileParts, ss.FileBlocks, ss.FileRowsCount, ss.CompressedFileSize) "smallPartSize: %d bytes; bigPartSize: %d bytes",
storageMetrics = initStorageMetrics(strg) time.Since(startTime).Seconds(), ss.SmallParts, ss.BigParts, ss.SmallPartBlocks, ss.BigPartBlocks, ss.SmallPartRowsCount, ss.BigPartRowsCount,
ss.CompressedSmallPartSize, ss.CompressedBigPartSize)
// register storage metrics
storageMetrics = metrics.NewSet()
storageMetrics.RegisterMetricsWriter(func(w io.Writer) {
writeStorageMetrics(w, strg)
})
metrics.RegisterSet(storageMetrics) metrics.RegisterSet(storageMetrics)
} }
@ -105,112 +111,56 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag
return strg.RunQuery(ctx, tenantIDs, q, writeBlock) return strg.RunQuery(ctx, tenantIDs, q, writeBlock)
} }
func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
ssCache := &logstorage.StorageStats{}
var ssCacheLock sync.Mutex
var lastUpdateTime time.Time
m := func() *logstorage.StorageStats {
ssCacheLock.Lock()
defer ssCacheLock.Unlock()
if time.Since(lastUpdateTime) < time.Second {
return ssCache
}
var ss logstorage.StorageStats var ss logstorage.StorageStats
strg.UpdateStats(&ss) strg.UpdateStats(&ss)
ssCache = &ss
lastUpdateTime = time.Now() metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath))
return ssCache
isReadOnly := uint64(0)
if ss.IsReadOnly {
isReadOnly = 1
} }
metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_storage_is_read_only{path=%q}`, *storageDataPath), isReadOnly)
ms := metrics.NewSet() metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/inmemory"}`, ss.InmemoryActiveMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/small"}`, ss.SmallPartActiveMerges)
metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/big"}`, ss.BigPartActiveMerges)
ms.NewGauge(fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), func() float64 { metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/inmemory"}`, ss.InmemoryMergesTotal)
return float64(fs.MustGetFreeSpace(*storageDataPath)) metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/small"}`, ss.SmallPartMergesTotal)
}) metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/big"}`, ss.BigPartMergesTotal)
ms.NewGauge(fmt.Sprintf(`vl_storage_is_read_only{path=%q}`, *storageDataPath), func() float64 {
if m().IsReadOnly { metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/inmemory"}`, ss.InmemoryRowsCount)
return 1 metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/small"}`, ss.SmallPartRowsCount)
} metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/big"}`, ss.BigPartRowsCount)
return 0
}) metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/inmemory"}`, ss.InmemoryParts)
metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/small"}`, ss.SmallParts)
ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/big"}`, ss.BigParts)
return float64(m().InmemoryActiveMerges)
}) metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/inmemory"}`, ss.InmemoryBlocks)
ms.NewGauge(`vl_merges_total{type="inmemory"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/small"}`, ss.SmallPartBlocks)
return float64(m().InmemoryMergesTotal) metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/big"}`, ss.BigPartBlocks)
})
ms.NewGauge(`vl_active_merges{type="file"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_partitions`, ss.PartitionsCount)
return float64(m().FileActiveMerges) metrics.WriteCounterUint64(w, `vl_streams_created_total`, ss.StreamsCreatedTotal)
})
ms.NewGauge(`vl_merges_total{type="file"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_indexdb_rows`, ss.IndexdbItemsCount)
return float64(m().FileMergesTotal) metrics.WriteGaugeUint64(w, `vl_indexdb_parts`, ss.IndexdbPartsCount)
}) metrics.WriteGaugeUint64(w, `vl_indexdb_blocks`, ss.IndexdbBlocksCount)
ms.NewGauge(`vl_storage_rows{type="inmemory"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_data_size_bytes{type="indexdb"}`, ss.IndexdbSizeBytes)
return float64(m().InmemoryRowsCount) metrics.WriteGaugeUint64(w, `vl_data_size_bytes{type="storage"}`, ss.CompressedInmemorySize+ss.CompressedSmallPartSize+ss.CompressedBigPartSize)
})
ms.NewGauge(`vl_storage_rows{type="file"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/inmemory"}`, ss.CompressedInmemorySize)
return float64(m().FileRowsCount) metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/small"}`, ss.CompressedSmallPartSize)
}) metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/big"}`, ss.CompressedBigPartSize)
ms.NewGauge(`vl_storage_parts{type="inmemory"}`, func() float64 {
return float64(m().InmemoryParts) metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/inmemory"}`, ss.UncompressedInmemorySize)
}) metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/small"}`, ss.UncompressedSmallPartSize)
ms.NewGauge(`vl_storage_parts{type="file"}`, func() float64 { metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/big"}`, ss.UncompressedBigPartSize)
return float64(m().FileParts)
}) metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_big_timestamp"}`, ss.RowsDroppedTooBigTimestamp)
ms.NewGauge(`vl_storage_blocks{type="inmemory"}`, func() float64 { metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_small_timestamp"}`, ss.RowsDroppedTooSmallTimestamp)
return float64(m().InmemoryBlocks)
})
ms.NewGauge(`vl_storage_blocks{type="file"}`, func() float64 {
return float64(m().FileBlocks)
})
ms.NewGauge(`vl_partitions`, func() float64 {
return float64(m().PartitionsCount)
})
ms.NewGauge(`vl_streams_created_total`, func() float64 {
return float64(m().StreamsCreatedTotal)
})
ms.NewGauge(`vl_indexdb_rows`, func() float64 {
return float64(m().IndexdbItemsCount)
})
ms.NewGauge(`vl_indexdb_parts`, func() float64 {
return float64(m().IndexdbPartsCount)
})
ms.NewGauge(`vl_indexdb_blocks`, func() float64 {
return float64(m().IndexdbBlocksCount)
})
ms.NewGauge(`vl_data_size_bytes{type="indexdb"}`, func() float64 {
return float64(m().IndexdbSizeBytes)
})
ms.NewGauge(`vl_data_size_bytes{type="storage"}`, func() float64 {
dm := m()
return float64(dm.CompressedInmemorySize + dm.CompressedFileSize)
})
ms.NewGauge(`vl_compressed_data_size_bytes{type="inmemory"}`, func() float64 {
return float64(m().CompressedInmemorySize)
})
ms.NewGauge(`vl_compressed_data_size_bytes{type="file"}`, func() float64 {
return float64(m().CompressedFileSize)
})
ms.NewGauge(`vl_uncompressed_data_size_bytes{type="inmemory"}`, func() float64 {
return float64(m().UncompressedInmemorySize)
})
ms.NewGauge(`vl_uncompressed_data_size_bytes{type="file"}`, func() float64 {
return float64(m().UncompressedFileSize)
})
ms.NewGauge(`vl_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 {
return float64(m().RowsDroppedTooBigTimestamp)
})
ms.NewGauge(`vl_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 {
return float64(m().RowsDroppedTooSmallTimestamp)
})
return ms
} }

View file

@ -520,7 +520,7 @@ func areSameFieldsInRows(rows [][]Field) bool {
fields := rows[0] fields := rows[0]
// Verify that all the field names are unique // Verify that all the field names are unique
m := make(map[string]struct{}, len(fields)) m := getFieldsSet()
for i := range fields { for i := range fields {
f := &fields[i] f := &fields[i]
if _, ok := m[f.Name]; ok { if _, ok := m[f.Name]; ok {
@ -529,6 +529,7 @@ func areSameFieldsInRows(rows [][]Field) bool {
} }
m[f.Name] = struct{}{} m[f.Name] = struct{}{}
} }
putFieldsSet(m)
// Verify that all the fields are the same across rows // Verify that all the fields are the same across rows
rows = rows[1:] rows = rows[1:]
@ -546,6 +547,21 @@ func areSameFieldsInRows(rows [][]Field) bool {
return true return true
} }
func getFieldsSet() map[string]struct{} {
v := fieldsSetPool.Get()
if v == nil {
return make(map[string]struct{})
}
return v.(map[string]struct{})
}
func putFieldsSet(m map[string]struct{}) {
clear(m)
fieldsSetPool.Put(m)
}
var fieldsSetPool sync.Pool
var columnIdxsPool sync.Pool var columnIdxsPool sync.Pool
func getColumnIdxs() map[string]int { func getColumnIdxs() map[string]int {

View file

@ -17,6 +17,16 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
) )
// The maximum size of big part.
//
// This number limits the maximum time required for building big part.
// This time shouldn't exceed a few days.
const maxBigPartSize = 1e12
// The interval for guaranteed flush of recently ingested data from memory to on-disk parts,
// so they survive process crash.
var dataFlushInterval = 5 * time.Second
// Default number of parts to merge at once. // Default number of parts to merge at once.
// //
// This number has been obtained empirically - it gives the lowest possible overhead. // This number has been obtained empirically - it gives the lowest possible overhead.
@ -43,8 +53,12 @@ type datadb struct {
inmemoryMergesTotal atomic.Uint64 inmemoryMergesTotal atomic.Uint64
inmemoryActiveMerges atomic.Int64 inmemoryActiveMerges atomic.Int64
fileMergesTotal atomic.Uint64
fileActiveMerges atomic.Int64 smallPartMergesTotal atomic.Uint64
smallPartActiveMerges atomic.Int64
bigPartMergesTotal atomic.Uint64
bigPartActiveMerges atomic.Int64
// pt is the partition the datadb belongs to // pt is the partition the datadb belongs to
pt *partition pt *partition
@ -58,8 +72,11 @@ type datadb struct {
// inmemoryParts contains a list of inmemory parts // inmemoryParts contains a list of inmemory parts
inmemoryParts []*partWrapper inmemoryParts []*partWrapper
// fileParts contains a list of file-based parts // smallParts contains a list of file-based small parts
fileParts []*partWrapper smallParts []*partWrapper
// bigParts contains a list of file-based big parts
bigParts []*partWrapper
// partsLock protects parts from concurrent access // partsLock protects parts from concurrent access
partsLock sync.Mutex partsLock sync.Mutex
@ -75,16 +92,6 @@ type datadb struct {
// It must be closed under partsLock in order to prevent from calling wg.Add() // It must be closed under partsLock in order to prevent from calling wg.Add()
// after stopCh is closed. // after stopCh is closed.
stopCh chan struct{} stopCh chan struct{}
// oldInmemoryPartsFlushersCount is the number of currently running flushers for old in-memory parts
//
// This variable must be accessed under partsLock.
oldInmemoryPartsFlushersCount int
// mergeWorkersCount is the number of currently running merge workers
//
// This variable must be accessed under partsLock.
mergeWorkersCount int
} }
// partWrapper is a wrapper for opened part. // partWrapper is a wrapper for opened part.
@ -140,7 +147,7 @@ func (pw *partWrapper) decRef() {
func mustCreateDatadb(path string) { func mustCreateDatadb(path string) {
fs.MustMkdirFailIfExist(path) fs.MustMkdirFailIfExist(path)
mustWritePartNames(path, []string{}) mustWritePartNames(path, nil, nil)
} }
// mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data. // mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data.
@ -151,8 +158,9 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
partNames := mustReadPartNames(path) partNames := mustReadPartNames(path)
mustRemoveUnusedDirs(path, partNames) mustRemoveUnusedDirs(path, partNames)
pws := make([]*partWrapper, len(partNames)) var smallParts []*partWrapper
for i, partName := range partNames { var bigParts []*partWrapper
for _, partName := range partNames {
// Make sure the partName exists on disk. // Make sure the partName exists on disk.
// If it is missing, then manual action from the user is needed, // If it is missing, then manual action from the user is needed,
// since this is unexpected state, which cannot occur under normal operation, // since this is unexpected state, which cannot occur under normal operation,
@ -166,181 +174,277 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da
} }
p := mustOpenFilePart(pt, partPath) p := mustOpenFilePart(pt, partPath)
pws[i] = newPartWrapper(p, nil, time.Time{}) pw := newPartWrapper(p, nil, time.Time{})
if p.ph.CompressedSizeBytes > getMaxInmemoryPartSize() {
bigParts = append(bigParts, pw)
} else {
smallParts = append(smallParts, pw)
}
} }
ddb := &datadb{ ddb := &datadb{
pt: pt, pt: pt,
flushInterval: flushInterval, flushInterval: flushInterval,
path: path, path: path,
fileParts: pws, smallParts: smallParts,
bigParts: bigParts,
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
ddb.mergeIdx.Store(uint64(time.Now().UnixNano())) ddb.mergeIdx.Store(uint64(time.Now().UnixNano()))
// Start merge workers in the hope they'll merge the remaining parts ddb.startBackgroundWorkers()
ddb.partsLock.Lock()
n := getMergeWorkersCount()
for i := 0; i < n; i++ {
ddb.startMergeWorkerLocked()
}
ddb.partsLock.Unlock()
return ddb return ddb
} }
// startOldInmemoryPartsFlusherLocked starts flusher for old in-memory parts to disk. func (ddb *datadb) startBackgroundWorkers() {
// // Start file parts mergers, so they could start merging unmerged parts if needed.
// This function must be called under partsLock. // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet.
func (ddb *datadb) startOldInmemoryPartsFlusherLocked() { ddb.startSmallPartsMergers()
ddb.startBigPartsMergers()
ddb.startInmemoryPartsFlusher()
}
var (
inmemoryPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
smallPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
bigPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs())
)
func (ddb *datadb) startSmallPartsMergers() {
ddb.partsLock.Lock()
for i := 0; i < cap(smallPartsConcurrencyCh); i++ {
ddb.startSmallPartsMergerLocked()
}
ddb.partsLock.Unlock()
}
func (ddb *datadb) startBigPartsMergers() {
ddb.partsLock.Lock()
for i := 0; i < cap(bigPartsConcurrencyCh); i++ {
ddb.startBigPartsMergerLocked()
}
ddb.partsLock.Unlock()
}
func (ddb *datadb) startInmemoryPartsMergerLocked() {
if needStop(ddb.stopCh) { if needStop(ddb.stopCh) {
return return
} }
maxWorkers := getMergeWorkersCount()
if ddb.oldInmemoryPartsFlushersCount >= maxWorkers {
return
}
ddb.oldInmemoryPartsFlushersCount++
ddb.wg.Add(1) ddb.wg.Add(1)
go func() { go func() {
ddb.flushOldInmemoryParts() ddb.inmemoryPartsMerger()
ddb.wg.Done() ddb.wg.Done()
}() }()
} }
func (ddb *datadb) flushOldInmemoryParts() { func (ddb *datadb) startSmallPartsMergerLocked() {
ticker := time.NewTicker(time.Second) if needStop(ddb.stopCh) {
defer ticker.Stop()
var parts, partsToMerge []*partWrapper
for !needStop(ddb.stopCh) {
ddb.partsLock.Lock()
parts = appendNotInMergePartsLocked(parts[:0], ddb.inmemoryParts)
currentTime := time.Now()
partsToFlush := parts[:0]
for _, pw := range parts {
if pw.flushDeadline.Before(currentTime) {
partsToFlush = append(partsToFlush, pw)
}
}
// Do not take into account available disk space when flushing in-memory parts to disk,
// since otherwise the outdated in-memory parts may remain in-memory, which, in turn,
// may result in increased memory usage plus possible loss of historical data.
// It is better to crash on out of disk error in this case.
partsToMerge = appendPartsToMerge(partsToMerge[:0], partsToFlush, math.MaxUint64)
if len(partsToMerge) == 0 {
partsToMerge = append(partsToMerge[:0], partsToFlush...)
}
setInMergeLocked(partsToMerge)
needStop := false
if len(ddb.inmemoryParts) == 0 {
// There are no in-memory parts, so stop the flusher.
needStop = true
ddb.oldInmemoryPartsFlushersCount--
}
ddb.partsLock.Unlock()
if needStop {
return return
} }
ddb.wg.Add(1)
ddb.mustMergeParts(partsToMerge, true) go func() {
if len(partsToMerge) < len(partsToFlush) { ddb.smallPartsMerger()
// Continue merging remaining old in-memory parts from partsToFlush list. ddb.wg.Done()
continue }()
} }
// There are no old in-memory parts to flush. Sleep for a while until these parts appear. func (ddb *datadb) startBigPartsMergerLocked() {
if needStop(ddb.stopCh) {
return
}
ddb.wg.Add(1)
go func() {
ddb.bigPartsMerger()
ddb.wg.Done()
}()
}
func (ddb *datadb) startInmemoryPartsFlusher() {
ddb.wg.Add(1)
go func() {
ddb.inmemoryPartsFlusher()
ddb.wg.Done()
}()
}
func (ddb *datadb) inmemoryPartsFlusher() {
// Do not add jitter to d in order to guarantee the flush interval
ticker := time.NewTicker(dataFlushInterval)
defer ticker.Stop()
for {
select { select {
case <-ddb.stopCh: case <-ddb.stopCh:
return return
case <-ticker.C: case <-ticker.C:
ddb.mustFlushInmemoryPartsToFiles(false)
} }
} }
} }
// startMergeWorkerLocked starts a merge worker. func (ddb *datadb) mustFlushInmemoryPartsToFiles(isFinal bool) {
// currentTime := time.Now()
// This function must be called under locked partsLock. var pws []*partWrapper
func (ddb *datadb) startMergeWorkerLocked() {
if needStop(ddb.stopCh) {
return
}
maxWorkers := getMergeWorkersCount()
if ddb.mergeWorkersCount >= maxWorkers {
return
}
ddb.mergeWorkersCount++
ddb.wg.Add(1)
go func() {
globalMergeLimitCh <- struct{}{}
ddb.mustMergeExistingParts()
<-globalMergeLimitCh
ddb.wg.Done()
}()
}
// globalMergeLimitCh limits the number of concurrent merges across all the partitions
var globalMergeLimitCh = make(chan struct{}, getMergeWorkersCount())
func getMergeWorkersCount() int {
n := cgroup.AvailableCPUs()
if n < 4 {
// Use bigger number of workers on systems with small number of CPU cores,
// since a single worker may become busy for long time when merging big parts.
// Then the remaining workers may continue performing merges
// for newly added small parts.
return 4
}
return n
}
func (ddb *datadb) mustMergeExistingParts() {
for !needStop(ddb.stopCh) {
maxOutBytes := availableDiskSpace(ddb.path)
ddb.partsLock.Lock() ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts)) for _, pw := range ddb.inmemoryParts {
parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) if !pw.isInMerge && (isFinal || pw.flushDeadline.Before(currentTime)) {
parts = appendNotInMergePartsLocked(parts, ddb.fileParts) pw.isInMerge = true
pws := appendPartsToMerge(nil, parts, maxOutBytes) pws = append(pws, pw)
setInMergeLocked(pws) }
if len(pws) == 0 {
ddb.mergeWorkersCount--
} }
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
ddb.mustMergePartsToFiles(pws)
}
func (ddb *datadb) mustMergePartsToFiles(pws []*partWrapper) {
wg := getWaitGroup()
for len(pws) > 0 {
pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws)
wg.Add(1)
inmemoryPartsConcurrencyCh <- struct{}{}
go func(pwsChunk []*partWrapper) {
defer func() {
<-inmemoryPartsConcurrencyCh
wg.Done()
}()
ddb.mustMergeParts(pwsChunk, true)
}(pwsToMerge)
pws = pwsRemaining
}
wg.Wait()
putWaitGroup(wg)
}
// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts.
//
// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items.
func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) {
pwsToMerge := appendPartsToMerge(nil, pws, math.MaxUint64)
if len(pwsToMerge) == 0 {
return pws, nil
}
m := partsToMap(pwsToMerge)
pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge))
for _, pw := range pws {
if _, ok := m[pw]; !ok {
pwsRemaining = append(pwsRemaining, pw)
}
}
// Clear references to pws items, so they could be reclaimed faster by Go GC.
for i := range pws {
pws[i] = nil
}
return pwsToMerge, pwsRemaining
}
func getWaitGroup() *sync.WaitGroup {
v := wgPool.Get()
if v == nil {
return &sync.WaitGroup{}
}
return v.(*sync.WaitGroup)
}
func putWaitGroup(wg *sync.WaitGroup) {
wgPool.Put(wg)
}
var wgPool sync.Pool
func (ddb *datadb) inmemoryPartsMerger() {
for {
if needStop(ddb.stopCh) {
return
}
maxOutBytes := ddb.getMaxBigPartSize()
ddb.partsLock.Lock()
pws := getPartsToMergeLocked(ddb.inmemoryParts, maxOutBytes)
ddb.partsLock.Unlock()
if len(pws) == 0 { if len(pws) == 0 {
// Nothing to merge at the moment. // Nothing to merge
return return
} }
inmemoryPartsConcurrencyCh <- struct{}{}
ddb.mustMergeParts(pws, false) ddb.mustMergeParts(pws, false)
<-inmemoryPartsConcurrencyCh
} }
} }
// appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result. func (ddb *datadb) smallPartsMerger() {
// for {
// This function must be called under partsLock. if needStop(ddb.stopCh) {
func appendNotInMergePartsLocked(dst, src []*partWrapper) []*partWrapper { return
for _, pw := range src {
if !pw.isInMerge {
dst = append(dst, pw)
} }
} maxOutBytes := ddb.getMaxBigPartSize()
return dst
ddb.partsLock.Lock()
pws := getPartsToMergeLocked(ddb.smallParts, maxOutBytes)
ddb.partsLock.Unlock()
if len(pws) == 0 {
// Nothing to merge
return
} }
// setInMergeLocked sets isInMerge flag for pws. smallPartsConcurrencyCh <- struct{}{}
ddb.mustMergeParts(pws, false)
<-smallPartsConcurrencyCh
}
}
func (ddb *datadb) bigPartsMerger() {
for {
if needStop(ddb.stopCh) {
return
}
maxOutBytes := ddb.getMaxBigPartSize()
ddb.partsLock.Lock()
pws := getPartsToMergeLocked(ddb.bigParts, maxOutBytes)
ddb.partsLock.Unlock()
if len(pws) == 0 {
// Nothing to merge
return
}
bigPartsConcurrencyCh <- struct{}{}
ddb.mustMergeParts(pws, false)
<-bigPartsConcurrencyCh
}
}
// getPartsToMergeLocked returns optimal parts to merge from pws.
// //
// This function must be called under partsLock. // The summary size of the returned parts must be smaller than maxOutBytes.
func setInMergeLocked(pws []*partWrapper) { func getPartsToMergeLocked(pws []*partWrapper, maxOutBytes uint64) []*partWrapper {
pwsRemaining := make([]*partWrapper, 0, len(pws))
for _, pw := range pws { for _, pw := range pws {
if !pw.isInMerge {
pwsRemaining = append(pwsRemaining, pw)
}
}
pwsToMerge := appendPartsToMerge(nil, pwsRemaining, maxOutBytes)
for _, pw := range pwsToMerge {
if pw.isInMerge { if pw.isInMerge {
logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true") logger.Panicf("BUG: partWrapper.isInMerge cannot be set")
} }
pw.isInMerge = true pw.isInMerge = true
} }
return pwsToMerge
} }
func assertIsInMerge(pws []*partWrapper) { func assertIsInMerge(pws []*partWrapper) {
@ -370,7 +474,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
startTime := time.Now() startTime := time.Now()
dstPartType := ddb.getDstPartType(pws, isFinal) dstPartType := ddb.getDstPartType(pws, isFinal)
if dstPartType == partFile { if dstPartType != partInmemory {
// Make sure there is enough disk space for performing the merge // Make sure there is enough disk space for performing the merge
partsSize := getCompressedSize(pws) partsSize := getCompressedSize(pws)
needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize) needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize)
@ -387,14 +491,21 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
} }
} }
if dstPartType == partInmemory { switch dstPartType {
case partInmemory:
ddb.inmemoryMergesTotal.Add(1) ddb.inmemoryMergesTotal.Add(1)
ddb.inmemoryActiveMerges.Add(1) ddb.inmemoryActiveMerges.Add(1)
defer ddb.inmemoryActiveMerges.Add(-1) defer ddb.inmemoryActiveMerges.Add(-1)
} else { case partSmall:
ddb.fileMergesTotal.Add(1) ddb.smallPartMergesTotal.Add(1)
ddb.fileActiveMerges.Add(1) ddb.smallPartActiveMerges.Add(1)
defer ddb.fileActiveMerges.Add(-1) defer ddb.smallPartActiveMerges.Add(-1)
case partBig:
ddb.bigPartMergesTotal.Add(1)
ddb.bigPartActiveMerges.Add(1)
defer ddb.bigPartActiveMerges.Add(-1)
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
} }
// Initialize destination paths. // Initialize destination paths.
@ -428,7 +539,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
mpNew = getInmemoryPart() mpNew = getInmemoryPart()
bsw.MustInitForInmemoryPart(mpNew) bsw.MustInitForInmemoryPart(mpNew)
} else { } else {
nocache := !shouldUsePageCacheForPartSize(srcSize) nocache := dstPartType == partBig
bsw.MustInitForFilePart(dstPartPath, nocache) bsw.MustInitForFilePart(dstPartPath, nocache)
} }
@ -455,7 +566,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
} }
if needStop(stopCh) { if needStop(stopCh) {
// Remove incomplete destination part // Remove incomplete destination part
if dstPartType == partFile { if dstPartType != partInmemory {
fs.MustRemoveAll(dstPartPath) fs.MustRemoveAll(dstPartPath)
} }
return return
@ -477,7 +588,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) {
ddb.swapSrcWithDstParts(pws, pwNew, dstPartType) ddb.swapSrcWithDstParts(pws, pwNew, dstPartType)
d := time.Since(startTime) d := time.Since(startTime)
if d <= 30*time.Second { if d <= time.Minute {
return return
} }
@ -496,21 +607,22 @@ type partType int
var ( var (
partInmemory = partType(0) partInmemory = partType(0)
partFile = partType(1) partSmall = partType(1)
partBig = partType(2)
) )
func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType { func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType {
if isFinal {
return partFile
}
dstPartSize := getCompressedSize(pws) dstPartSize := getCompressedSize(pws)
if dstPartSize > getMaxInmemoryPartSize() { if dstPartSize > ddb.getMaxSmallPartSize() {
return partFile return partBig
}
if isFinal || dstPartSize > getMaxInmemoryPartSize() {
return partSmall
} }
if !areAllInmemoryParts(pws) { if !areAllInmemoryParts(pws) {
// If at least a single source part is located in file, // If at least a single source part is located in file,
// then the destination part must be in file for durability reasons. // then the destination part must be in file for durability reasons.
return partFile return partSmall
} }
return partInmemory return partInmemory
} }
@ -560,45 +672,8 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
ddb.partsLock.Lock() ddb.partsLock.Lock()
ddb.inmemoryParts = append(ddb.inmemoryParts, pw) ddb.inmemoryParts = append(ddb.inmemoryParts, pw)
ddb.startOldInmemoryPartsFlusherLocked() ddb.startInmemoryPartsMergerLocked()
if len(ddb.inmemoryParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
needAssistedMerge := ddb.needAssistedMergeForInmemoryPartsLocked()
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
if needAssistedMerge {
ddb.assistedMergeForInmemoryParts()
}
}
func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool {
if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition {
return false
}
n := 0
for _, pw := range ddb.inmemoryParts {
if !pw.isInMerge {
n++
}
}
return n >= defaultPartsToMerge
}
func (ddb *datadb) assistedMergeForInmemoryParts() {
ddb.partsLock.Lock()
parts := make([]*partWrapper, 0, len(ddb.inmemoryParts))
parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts)
// Do not take into account available disk space when merging in-memory parts,
// since otherwise the outdated in-memory parts may remain in-memory, which, in turn,
// may result in increased memory usage plus possible loss of historical data.
// It is better to crash on out of disk error in this case.
pws := make([]*partWrapper, 0, len(parts))
pws = appendPartsToMerge(pws[:0], parts, math.MaxUint64)
setInMergeLocked(pws)
ddb.partsLock.Unlock()
ddb.mustMergeParts(pws, false)
} }
// DatadbStats contains various stats for datadb. // DatadbStats contains various stats for datadb.
@ -609,41 +684,62 @@ type DatadbStats struct {
// InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb. // InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb.
InmemoryActiveMerges uint64 InmemoryActiveMerges uint64
// FileMergesTotal is the number of file merges performed in the given datadb. // SmallPartMergesTotal is the number of small file merges performed in the given datadb.
FileMergesTotal uint64 SmallPartMergesTotal uint64
// FileActiveMerges is the number of currently active file merges performed by the given datadb. // SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb.
FileActiveMerges uint64 SmallPartActiveMerges uint64
// BigPartMergesTotal is the number of big file merges performed in the given datadb.
BigPartMergesTotal uint64
// BigPartActiveMerges is the number of currently active big file merges performed by the given datadb.
BigPartActiveMerges uint64
// InmemoryRowsCount is the number of rows, which weren't flushed to disk yet. // InmemoryRowsCount is the number of rows, which weren't flushed to disk yet.
InmemoryRowsCount uint64 InmemoryRowsCount uint64
// FileRowsCount is the number of rows stored on disk. // SmallPartRowsCount is the number of rows stored on disk in small parts.
FileRowsCount uint64 SmallPartRowsCount uint64
// BigPartRowsCount is the number of rows stored on disk in big parts.
BigPartRowsCount uint64
// InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet. // InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet.
InmemoryParts uint64 InmemoryParts uint64
// FileParts is the number of file-based parts stored on disk. // SmallParts is the number of file-based small parts stored on disk.
FileParts uint64 SmallParts uint64
// BigParts is the number of file-based big parts stored on disk.
BigParts uint64
// InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet. // InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet.
InmemoryBlocks uint64 InmemoryBlocks uint64
// FileBlocks is the number of file-based blocks stored on disk. // SmallPartBlocks is the number of file-based small blocks stored on disk.
FileBlocks uint64 SmallPartBlocks uint64
// BigPartBlocks is the number of file-based big blocks stored on disk.
BigPartBlocks uint64
// CompressedInmemorySize is the size of compressed data stored in memory. // CompressedInmemorySize is the size of compressed data stored in memory.
CompressedInmemorySize uint64 CompressedInmemorySize uint64
// CompressedFileSize is the size of compressed data stored on disk. // CompressedSmallPartSize is the size of compressed small parts data stored on disk.
CompressedFileSize uint64 CompressedSmallPartSize uint64
// CompressedBigPartSize is the size of compressed big data stored on disk.
CompressedBigPartSize uint64
// UncompressedInmemorySize is the size of uncompressed data stored in memory. // UncompressedInmemorySize is the size of uncompressed data stored in memory.
UncompressedInmemorySize uint64 UncompressedInmemorySize uint64
// UncompressedFileSize is the size of uncompressed data stored on disk. // UncompressedSmallPartSize is the size of uncompressed small data stored on disk.
UncompressedFileSize uint64 UncompressedSmallPartSize uint64
// UncompressedBigPartSize is the size of uncompressed big data stored on disk.
UncompressedBigPartSize uint64
} }
func (s *DatadbStats) reset() { func (s *DatadbStats) reset() {
@ -652,32 +748,39 @@ func (s *DatadbStats) reset() {
// RowsCount returns the number of rows stored in datadb. // RowsCount returns the number of rows stored in datadb.
func (s *DatadbStats) RowsCount() uint64 { func (s *DatadbStats) RowsCount() uint64 {
return s.InmemoryRowsCount + s.FileRowsCount return s.InmemoryRowsCount + s.SmallPartRowsCount + s.BigPartRowsCount
} }
// updateStats updates s with ddb stats // updateStats updates s with ddb stats.
func (ddb *datadb) updateStats(s *DatadbStats) { func (ddb *datadb) updateStats(s *DatadbStats) {
s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load() s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load()
s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load()) s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load())
s.FileMergesTotal += ddb.fileMergesTotal.Load() s.SmallPartMergesTotal += ddb.smallPartMergesTotal.Load()
s.FileActiveMerges += uint64(ddb.fileActiveMerges.Load()) s.SmallPartActiveMerges += uint64(ddb.smallPartActiveMerges.Load())
s.BigPartMergesTotal += ddb.bigPartMergesTotal.Load()
s.BigPartActiveMerges += uint64(ddb.bigPartActiveMerges.Load())
ddb.partsLock.Lock() ddb.partsLock.Lock()
s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts) s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts)
s.FileRowsCount += getRowsCount(ddb.fileParts) s.SmallPartRowsCount += getRowsCount(ddb.smallParts)
s.BigPartRowsCount += getRowsCount(ddb.bigParts)
s.InmemoryParts += uint64(len(ddb.inmemoryParts)) s.InmemoryParts += uint64(len(ddb.inmemoryParts))
s.FileParts += uint64(len(ddb.fileParts)) s.SmallParts += uint64(len(ddb.smallParts))
s.BigParts += uint64(len(ddb.bigParts))
s.InmemoryBlocks += getBlocksCount(ddb.inmemoryParts) s.InmemoryBlocks += getBlocksCount(ddb.inmemoryParts)
s.FileBlocks += getBlocksCount(ddb.fileParts) s.SmallPartBlocks += getBlocksCount(ddb.smallParts)
s.BigPartBlocks += getBlocksCount(ddb.bigParts)
s.CompressedInmemorySize += getCompressedSize(ddb.inmemoryParts) s.CompressedInmemorySize += getCompressedSize(ddb.inmemoryParts)
s.CompressedFileSize += getCompressedSize(ddb.fileParts) s.CompressedSmallPartSize += getCompressedSize(ddb.smallParts)
s.CompressedBigPartSize += getCompressedSize(ddb.bigParts)
s.UncompressedInmemorySize += getUncompressedSize(ddb.inmemoryParts) s.UncompressedInmemorySize += getUncompressedSize(ddb.inmemoryParts)
s.UncompressedFileSize += getUncompressedSize(ddb.fileParts) s.UncompressedSmallPartSize += getUncompressedSize(ddb.smallParts)
s.UncompressedBigPartSize += getUncompressedSize(ddb.bigParts)
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
} }
@ -687,29 +790,56 @@ func (ddb *datadb) debugFlush() {
// Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts. // Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts.
} }
func (ddb *datadb) mustFlushInmemoryPartsToDisk() { func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) {
// Atomically unregister old parts and add new part to pt.
partsToRemove := partsToMap(pws)
removedInmemoryParts := 0
removedSmallParts := 0
removedBigParts := 0
ddb.partsLock.Lock() ddb.partsLock.Lock()
pws := append([]*partWrapper{}, ddb.inmemoryParts...)
setInMergeLocked(pws) ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
ddb.smallParts, removedSmallParts = removeParts(ddb.smallParts, partsToRemove)
ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove)
if pwNew != nil {
switch dstPartType {
case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startInmemoryPartsMergerLocked()
case partSmall:
ddb.smallParts = append(ddb.smallParts, pwNew)
ddb.startSmallPartsMergerLocked()
case partBig:
ddb.bigParts = append(ddb.bigParts, pwNew)
ddb.startBigPartsMergerLocked()
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
}
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && dstPartType != partInmemory {
smallPartNames := getPartNames(ddb.smallParts)
bigPartNames := getPartNames(ddb.bigParts)
mustWritePartNames(ddb.path, smallPartNames, bigPartNames)
}
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
var pwsChunk []*partWrapper removedParts := removedInmemoryParts + removedSmallParts + removedBigParts
for len(pws) > 0 { if removedParts != len(partsToRemove) {
// Do not take into account available disk space when performing the final flush of in-memory parts to disk, logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove))
// since otherwise these parts will be lost.
// It is better to crash on out of disk error in this case.
pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, math.MaxUint64)
if len(pwsChunk) == 0 {
pwsChunk = append(pwsChunk[:0], pws...)
}
partsToRemove := partsToMap(pwsChunk)
removedParts := 0
pws, removedParts = removeParts(pws, partsToRemove)
if removedParts != len(pwsChunk) {
logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk))
} }
ddb.mustMergeParts(pwsChunk, true) // Mark old parts as must be deleted and decrement reference count, so they are eventually closed and deleted.
for _, pw := range pws {
pw.mustDrop.Store(true)
pw.decRef()
} }
} }
@ -724,54 +854,6 @@ func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} {
return m return m
} }
func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) {
// Atomically unregister old parts and add new part to pt.
partsToRemove := partsToMap(pws)
removedInmemoryParts := 0
removedFileParts := 0
ddb.partsLock.Lock()
ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove)
ddb.fileParts, removedFileParts = removeParts(ddb.fileParts, partsToRemove)
if pwNew != nil {
switch dstPartType {
case partInmemory:
ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew)
ddb.startOldInmemoryPartsFlusherLocked()
case partFile:
ddb.fileParts = append(ddb.fileParts, pwNew)
default:
logger.Panicf("BUG: unknown partType=%d", dstPartType)
}
if len(ddb.inmemoryParts)+len(ddb.fileParts) > defaultPartsToMerge {
ddb.startMergeWorkerLocked()
}
}
// Atomically store the updated list of file-based parts on disk.
// This must be performed under partsLock in order to prevent from races
// when multiple concurrently running goroutines update the list.
if removedFileParts > 0 || pwNew != nil && dstPartType == partFile {
partNames := getPartNames(ddb.fileParts)
mustWritePartNames(ddb.path, partNames)
}
ddb.partsLock.Unlock()
removedParts := removedInmemoryParts + removedFileParts
if removedParts != len(partsToRemove) {
logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove))
}
// Mark old parts as must be deleted and decrement reference count,
// so they are eventually closed and deleted.
for _, pw := range pws {
pw.mustDrop.Store(true)
pw.decRef()
}
}
func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) {
dst := pws[:0] dst := pws[:0]
for _, pw := range pws { for _, pw := range pws {
@ -853,6 +935,34 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) {
ddb.partsLock.Unlock() ddb.partsLock.Unlock()
} }
func (ddb *datadb) getMaxBigPartSize() uint64 {
return getMaxOutBytes(ddb.path)
}
func (ddb *datadb) getMaxSmallPartSize() uint64 {
// Small parts are cached in the OS page cache,
// so limit their size by the remaining free RAM.
mem := memory.Remaining()
n := uint64(mem) / defaultPartsToMerge
if n < 10e6 {
n = 10e6
}
// Make sure the output part fits available disk space for small parts.
sizeLimit := getMaxOutBytes(ddb.path)
if n > sizeLimit {
n = sizeLimit
}
return n
}
func getMaxOutBytes(path string) uint64 {
n := availableDiskSpace(path)
if n > maxBigPartSize {
n = maxBigPartSize
}
return n
}
func availableDiskSpace(path string) uint64 { func availableDiskSpace(path string) uint64 {
available := fs.MustGetFreeSpace(path) available := fs.MustGetFreeSpace(path)
reserved := reservedDiskSpace.Load() reserved := reservedDiskSpace.Load()
@ -865,7 +975,7 @@ func availableDiskSpace(path string) uint64 {
func tryReserveDiskSpace(path string, n uint64) bool { func tryReserveDiskSpace(path string, n uint64) bool {
available := fs.MustGetFreeSpace(path) available := fs.MustGetFreeSpace(path)
reserved := reserveDiskSpace(n) reserved := reserveDiskSpace(n)
if available > reserved { if available >= reserved {
return true return true
} }
releaseDiskSpace(n) releaseDiskSpace(n)
@ -908,20 +1018,29 @@ func mustCloseDatadb(ddb *datadb) {
ddb.wg.Wait() ddb.wg.Wait()
// flush in-memory data to disk // flush in-memory data to disk
ddb.mustFlushInmemoryPartsToDisk() ddb.mustFlushInmemoryPartsToFiles(true)
if len(ddb.inmemoryParts) > 0 { if len(ddb.inmemoryParts) > 0 {
logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts)) logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts))
} }
ddb.inmemoryParts = nil ddb.inmemoryParts = nil
// close file parts // close small parts
for _, pw := range ddb.fileParts { for _, pw := range ddb.smallParts {
pw.decRef() pw.decRef()
if n := pw.refCount.Load(); n != 0 { if n := pw.refCount.Load(); n != 0 {
logger.Panicf("BUG: ther are %d references to filePart", n) logger.Panicf("BUG: there are %d references to smallPart", n)
} }
} }
ddb.fileParts = nil ddb.smallParts = nil
// close big parts
for _, pw := range ddb.bigParts {
pw.decRef()
if n := pw.refCount.Load(); n != 0 {
logger.Panicf("BUG: there are %d references to bigPart", n)
}
}
ddb.bigParts = nil
ddb.path = "" ddb.path = ""
ddb.pt = nil ddb.pt = nil
@ -941,7 +1060,9 @@ func getPartNames(pws []*partWrapper) []string {
return partNames return partNames
} }
func mustWritePartNames(path string, partNames []string) { func mustWritePartNames(path string, smallPartNames, bigPartNames []string) {
partNames := append([]string{}, smallPartNames...)
partNames = append(partNames, bigPartNames...)
data, err := json.Marshal(partNames) data, err := json.Marshal(partNames)
if err != nil { if err != nil {
logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err) logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err)
@ -1102,8 +1223,3 @@ func getBlocksCount(pws []*partWrapper) uint64 {
} }
return n return n
} }
func shouldUsePageCacheForPartSize(size uint64) bool {
mem := memory.Remaining() / defaultPartsToMerge
return size <= uint64(mem)
}

View file

@ -26,7 +26,7 @@ type LogRows struct {
// timestamps holds stimestamps for rows added to LogRows // timestamps holds stimestamps for rows added to LogRows
timestamps []int64 timestamps []int64
// rows holds fields for rows atted to LogRows. // rows holds fields for rows added to LogRows.
rows [][]Field rows [][]Field
// sf is a helper for sorting fields in every added row // sf is a helper for sorting fields in every added row

View file

@ -26,14 +26,20 @@ func TestPartitionLifecycle(t *testing.T) {
if ddbStats.InmemoryParts != 0 { if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts in empty partition: %d", ddbStats.InmemoryParts) t.Fatalf("unexpected non-zero number of in-memory parts in empty partition: %d", ddbStats.InmemoryParts)
} }
if ddbStats.FileParts != 0 { if ddbStats.SmallParts != 0 {
t.Fatalf("unexpected non-zero number of file parts in empty partition: %d", ddbStats.FileParts) t.Fatalf("unexpected non-zero number of small file parts in empty partition: %d", ddbStats.SmallParts)
}
if ddbStats.BigParts != 0 {
t.Fatalf("unexpected non-zero number of big file parts in empty partition: %d", ddbStats.BigParts)
} }
if ddbStats.CompressedInmemorySize != 0 { if ddbStats.CompressedInmemorySize != 0 {
t.Fatalf("unexpected non-zero size of inmemory parts for empty partition") t.Fatalf("unexpected non-zero size of inmemory parts for empty partition")
} }
if ddbStats.CompressedFileSize != 0 { if ddbStats.CompressedSmallPartSize != 0 {
t.Fatalf("unexpected non-zero size of file parts for empty partition") t.Fatalf("unexpected non-zero size of small file parts for empty partition")
}
if ddbStats.CompressedBigPartSize != 0 {
t.Fatalf("unexpected non-zero size of big file parts for empty partition")
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
mustClosePartition(pt) mustClosePartition(pt)
@ -87,8 +93,8 @@ func TestPartitionMustAddRowsSerial(t *testing.T) {
if ddbStats.InmemoryParts != 0 { if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts) t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts)
} }
if ddbStats.FileParts == 0 { if ddbStats.SmallParts+ddbStats.BigParts == 0 {
t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition") t.Fatalf("the number of small parts must be greater than 0 after re-opening the partition")
} }
// Try adding entries for multiple streams at a time // Try adding entries for multiple streams at a time
@ -115,7 +121,7 @@ func TestPartitionMustAddRowsSerial(t *testing.T) {
if ddbStats.InmemoryParts != 0 { if ddbStats.InmemoryParts != 0 {
t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts) t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts)
} }
if ddbStats.FileParts == 0 { if ddbStats.SmallParts+ddbStats.BigParts == 0 {
t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition") t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition")
} }

View file

@ -400,7 +400,8 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter)
func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer {
// Select parts with data for the given time range // Select parts with data for the given time range
ddb.partsLock.Lock() ddb.partsLock.Lock()
pws := appendPartsInTimeRange(nil, ddb.fileParts, so.minTimestamp, so.maxTimestamp) pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp)
pws = appendPartsInTimeRange(pws, ddb.smallParts, so.minTimestamp, so.maxTimestamp)
pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp)
// Increase references to the searched parts, so they aren't deleted during search. // Increase references to the searched parts, so they aren't deleted during search.

View file

@ -65,11 +65,8 @@ type valuesEncoder struct {
func (ve *valuesEncoder) reset() { func (ve *valuesEncoder) reset() {
ve.buf = ve.buf[:0] ve.buf = ve.buf[:0]
vs := ve.values clear(ve.values)
for i := range vs { ve.values = ve.values[:0]
vs[i] = ""
}
ve.values = vs[:0]
} }
// encode encodes values to ve.values and returns the encoded value type with min/max encoded values. // encode encodes values to ve.values and returns the encoded value type with min/max encoded values.
@ -1073,11 +1070,8 @@ type valuesDict struct {
} }
func (vd *valuesDict) reset() { func (vd *valuesDict) reset() {
vs := vd.values clear(vd.values)
for i := range vs { vd.values = vd.values[:0]
vs[i] = ""
}
vd.values = vs[:0]
} }
func (vd *valuesDict) copyFrom(src *valuesDict) { func (vd *valuesDict) copyFrom(src *valuesDict) {
@ -1134,7 +1128,7 @@ func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) {
return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err) return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err)
} }
src = tail src = tail
// Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod
v := string(data) v := string(data)
vd.values = append(vd.values, v) vd.values = append(vd.values, v)
} }