diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 6fb3df74e..15331e191 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -4,8 +4,8 @@ import ( "context" "flag" "fmt" + "io" "net/http" - "sync" "time" "github.com/VictoriaMetrics/metrics" @@ -62,10 +62,16 @@ func Init() { var ss logstorage.StorageStats strg.UpdateStats(&ss) - logger.Infof("successfully opened storage in %.3f seconds; partsCount: %d; blocksCount: %d; rowsCount: %d; sizeBytes: %d", - time.Since(startTime).Seconds(), ss.FileParts, ss.FileBlocks, ss.FileRowsCount, ss.CompressedFileSize) - storageMetrics = initStorageMetrics(strg) + logger.Infof("successfully opened storage in %.3f seconds; smallParts: %d; bigParts: %d; smallPartBlocks: %d; bigPartBlocks: %d; smallPartRows: %d; bigPartRows: %d; "+ + "smallPartSize: %d bytes; bigPartSize: %d bytes", + time.Since(startTime).Seconds(), ss.SmallParts, ss.BigParts, ss.SmallPartBlocks, ss.BigPartBlocks, ss.SmallPartRowsCount, ss.BigPartRowsCount, + ss.CompressedSmallPartSize, ss.CompressedBigPartSize) + // register storage metrics + storageMetrics = metrics.NewSet() + storageMetrics.RegisterMetricsWriter(func(w io.Writer) { + writeStorageMetrics(w, strg) + }) metrics.RegisterSet(storageMetrics) } @@ -105,112 +111,56 @@ func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorag return strg.RunQuery(ctx, tenantIDs, q, writeBlock) } -func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { - ssCache := &logstorage.StorageStats{} - var ssCacheLock sync.Mutex - var lastUpdateTime time.Time +func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { + var ss logstorage.StorageStats + strg.UpdateStats(&ss) - m := func() *logstorage.StorageStats { - ssCacheLock.Lock() - defer ssCacheLock.Unlock() - if time.Since(lastUpdateTime) < time.Second { - return ssCache - } - var ss logstorage.StorageStats - strg.UpdateStats(&ss) - ssCache = &ss - lastUpdateTime = time.Now() - return ssCache + metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), fs.MustGetFreeSpace(*storageDataPath)) + + isReadOnly := uint64(0) + if ss.IsReadOnly { + isReadOnly = 1 } + metrics.WriteGaugeUint64(w, fmt.Sprintf(`vl_storage_is_read_only{path=%q}`, *storageDataPath), isReadOnly) - ms := metrics.NewSet() + metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/inmemory"}`, ss.InmemoryActiveMerges) + metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/small"}`, ss.SmallPartActiveMerges) + metrics.WriteGaugeUint64(w, `vl_active_merges{type="storage/big"}`, ss.BigPartActiveMerges) - ms.NewGauge(fmt.Sprintf(`vl_free_disk_space_bytes{path=%q}`, *storageDataPath), func() float64 { - return float64(fs.MustGetFreeSpace(*storageDataPath)) - }) - ms.NewGauge(fmt.Sprintf(`vl_storage_is_read_only{path=%q}`, *storageDataPath), func() float64 { - if m().IsReadOnly { - return 1 - } - return 0 - }) + metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/inmemory"}`, ss.InmemoryMergesTotal) + metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/small"}`, ss.SmallPartMergesTotal) + metrics.WriteCounterUint64(w, `vl_merges_total{type="storage/big"}`, ss.BigPartMergesTotal) - ms.NewGauge(`vl_active_merges{type="inmemory"}`, func() float64 { - return float64(m().InmemoryActiveMerges) - }) - ms.NewGauge(`vl_merges_total{type="inmemory"}`, func() float64 { - return float64(m().InmemoryMergesTotal) - }) - ms.NewGauge(`vl_active_merges{type="file"}`, func() float64 { - return float64(m().FileActiveMerges) - }) - ms.NewGauge(`vl_merges_total{type="file"}`, func() float64 { - return float64(m().FileMergesTotal) - }) + metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/inmemory"}`, ss.InmemoryRowsCount) + metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/small"}`, ss.SmallPartRowsCount) + metrics.WriteGaugeUint64(w, `vl_storage_rows{type="storage/big"}`, ss.BigPartRowsCount) - ms.NewGauge(`vl_storage_rows{type="inmemory"}`, func() float64 { - return float64(m().InmemoryRowsCount) - }) - ms.NewGauge(`vl_storage_rows{type="file"}`, func() float64 { - return float64(m().FileRowsCount) - }) - ms.NewGauge(`vl_storage_parts{type="inmemory"}`, func() float64 { - return float64(m().InmemoryParts) - }) - ms.NewGauge(`vl_storage_parts{type="file"}`, func() float64 { - return float64(m().FileParts) - }) - ms.NewGauge(`vl_storage_blocks{type="inmemory"}`, func() float64 { - return float64(m().InmemoryBlocks) - }) - ms.NewGauge(`vl_storage_blocks{type="file"}`, func() float64 { - return float64(m().FileBlocks) - }) + metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/inmemory"}`, ss.InmemoryParts) + metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/small"}`, ss.SmallParts) + metrics.WriteGaugeUint64(w, `vl_storage_parts{type="storage/big"}`, ss.BigParts) - ms.NewGauge(`vl_partitions`, func() float64 { - return float64(m().PartitionsCount) - }) - ms.NewGauge(`vl_streams_created_total`, func() float64 { - return float64(m().StreamsCreatedTotal) - }) + metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/inmemory"}`, ss.InmemoryBlocks) + metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/small"}`, ss.SmallPartBlocks) + metrics.WriteGaugeUint64(w, `vl_storage_blocks{type="storage/big"}`, ss.BigPartBlocks) - ms.NewGauge(`vl_indexdb_rows`, func() float64 { - return float64(m().IndexdbItemsCount) - }) - ms.NewGauge(`vl_indexdb_parts`, func() float64 { - return float64(m().IndexdbPartsCount) - }) - ms.NewGauge(`vl_indexdb_blocks`, func() float64 { - return float64(m().IndexdbBlocksCount) - }) + metrics.WriteGaugeUint64(w, `vl_partitions`, ss.PartitionsCount) + metrics.WriteCounterUint64(w, `vl_streams_created_total`, ss.StreamsCreatedTotal) - ms.NewGauge(`vl_data_size_bytes{type="indexdb"}`, func() float64 { - return float64(m().IndexdbSizeBytes) - }) - ms.NewGauge(`vl_data_size_bytes{type="storage"}`, func() float64 { - dm := m() - return float64(dm.CompressedInmemorySize + dm.CompressedFileSize) - }) + metrics.WriteGaugeUint64(w, `vl_indexdb_rows`, ss.IndexdbItemsCount) + metrics.WriteGaugeUint64(w, `vl_indexdb_parts`, ss.IndexdbPartsCount) + metrics.WriteGaugeUint64(w, `vl_indexdb_blocks`, ss.IndexdbBlocksCount) - ms.NewGauge(`vl_compressed_data_size_bytes{type="inmemory"}`, func() float64 { - return float64(m().CompressedInmemorySize) - }) - ms.NewGauge(`vl_compressed_data_size_bytes{type="file"}`, func() float64 { - return float64(m().CompressedFileSize) - }) - ms.NewGauge(`vl_uncompressed_data_size_bytes{type="inmemory"}`, func() float64 { - return float64(m().UncompressedInmemorySize) - }) - ms.NewGauge(`vl_uncompressed_data_size_bytes{type="file"}`, func() float64 { - return float64(m().UncompressedFileSize) - }) + metrics.WriteGaugeUint64(w, `vl_data_size_bytes{type="indexdb"}`, ss.IndexdbSizeBytes) + metrics.WriteGaugeUint64(w, `vl_data_size_bytes{type="storage"}`, ss.CompressedInmemorySize+ss.CompressedSmallPartSize+ss.CompressedBigPartSize) - ms.NewGauge(`vl_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 { - return float64(m().RowsDroppedTooBigTimestamp) - }) - ms.NewGauge(`vl_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 { - return float64(m().RowsDroppedTooSmallTimestamp) - }) + metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/inmemory"}`, ss.CompressedInmemorySize) + metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/small"}`, ss.CompressedSmallPartSize) + metrics.WriteGaugeUint64(w, `vl_compressed_data_size_bytes{type="storage/big"}`, ss.CompressedBigPartSize) - return ms + metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/inmemory"}`, ss.UncompressedInmemorySize) + metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/small"}`, ss.UncompressedSmallPartSize) + metrics.WriteGaugeUint64(w, `vl_uncompressed_data_size_bytes{type="storage/big"}`, ss.UncompressedBigPartSize) + + metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_big_timestamp"}`, ss.RowsDroppedTooBigTimestamp) + metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_small_timestamp"}`, ss.RowsDroppedTooSmallTimestamp) } diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index e5d11bd8c..593e8e7a4 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -520,7 +520,7 @@ func areSameFieldsInRows(rows [][]Field) bool { fields := rows[0] // Verify that all the field names are unique - m := make(map[string]struct{}, len(fields)) + m := getFieldsSet() for i := range fields { f := &fields[i] if _, ok := m[f.Name]; ok { @@ -529,6 +529,7 @@ func areSameFieldsInRows(rows [][]Field) bool { } m[f.Name] = struct{}{} } + putFieldsSet(m) // Verify that all the fields are the same across rows rows = rows[1:] @@ -546,6 +547,21 @@ func areSameFieldsInRows(rows [][]Field) bool { return true } +func getFieldsSet() map[string]struct{} { + v := fieldsSetPool.Get() + if v == nil { + return make(map[string]struct{}) + } + return v.(map[string]struct{}) +} + +func putFieldsSet(m map[string]struct{}) { + clear(m) + fieldsSetPool.Put(m) +} + +var fieldsSetPool sync.Pool + var columnIdxsPool sync.Pool func getColumnIdxs() map[string]int { diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 9a96c41d9..f3c9a5acb 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -17,6 +17,16 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/memory" ) +// The maximum size of big part. +// +// This number limits the maximum time required for building big part. +// This time shouldn't exceed a few days. +const maxBigPartSize = 1e12 + +// The interval for guaranteed flush of recently ingested data from memory to on-disk parts, +// so they survive process crash. +var dataFlushInterval = 5 * time.Second + // Default number of parts to merge at once. // // This number has been obtained empirically - it gives the lowest possible overhead. @@ -43,8 +53,12 @@ type datadb struct { inmemoryMergesTotal atomic.Uint64 inmemoryActiveMerges atomic.Int64 - fileMergesTotal atomic.Uint64 - fileActiveMerges atomic.Int64 + + smallPartMergesTotal atomic.Uint64 + smallPartActiveMerges atomic.Int64 + + bigPartMergesTotal atomic.Uint64 + bigPartActiveMerges atomic.Int64 // pt is the partition the datadb belongs to pt *partition @@ -58,8 +72,11 @@ type datadb struct { // inmemoryParts contains a list of inmemory parts inmemoryParts []*partWrapper - // fileParts contains a list of file-based parts - fileParts []*partWrapper + // smallParts contains a list of file-based small parts + smallParts []*partWrapper + + // bigParts contains a list of file-based big parts + bigParts []*partWrapper // partsLock protects parts from concurrent access partsLock sync.Mutex @@ -75,16 +92,6 @@ type datadb struct { // It must be closed under partsLock in order to prevent from calling wg.Add() // after stopCh is closed. stopCh chan struct{} - - // oldInmemoryPartsFlushersCount is the number of currently running flushers for old in-memory parts - // - // This variable must be accessed under partsLock. - oldInmemoryPartsFlushersCount int - - // mergeWorkersCount is the number of currently running merge workers - // - // This variable must be accessed under partsLock. - mergeWorkersCount int } // partWrapper is a wrapper for opened part. @@ -140,7 +147,7 @@ func (pw *partWrapper) decRef() { func mustCreateDatadb(path string) { fs.MustMkdirFailIfExist(path) - mustWritePartNames(path, []string{}) + mustWritePartNames(path, nil, nil) } // mustOpenDatadb opens datadb at the given path with the given flushInterval for in-memory data. @@ -151,8 +158,9 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da partNames := mustReadPartNames(path) mustRemoveUnusedDirs(path, partNames) - pws := make([]*partWrapper, len(partNames)) - for i, partName := range partNames { + var smallParts []*partWrapper + var bigParts []*partWrapper + for _, partName := range partNames { // Make sure the partName exists on disk. // If it is missing, then manual action from the user is needed, // since this is unexpected state, which cannot occur under normal operation, @@ -166,181 +174,277 @@ func mustOpenDatadb(pt *partition, path string, flushInterval time.Duration) *da } p := mustOpenFilePart(pt, partPath) - pws[i] = newPartWrapper(p, nil, time.Time{}) + pw := newPartWrapper(p, nil, time.Time{}) + if p.ph.CompressedSizeBytes > getMaxInmemoryPartSize() { + bigParts = append(bigParts, pw) + } else { + smallParts = append(smallParts, pw) + } } ddb := &datadb{ pt: pt, flushInterval: flushInterval, path: path, - fileParts: pws, + smallParts: smallParts, + bigParts: bigParts, stopCh: make(chan struct{}), } ddb.mergeIdx.Store(uint64(time.Now().UnixNano())) - // Start merge workers in the hope they'll merge the remaining parts - ddb.partsLock.Lock() - n := getMergeWorkersCount() - for i := 0; i < n; i++ { - ddb.startMergeWorkerLocked() - } - ddb.partsLock.Unlock() + ddb.startBackgroundWorkers() return ddb } -// startOldInmemoryPartsFlusherLocked starts flusher for old in-memory parts to disk. -// -// This function must be called under partsLock. -func (ddb *datadb) startOldInmemoryPartsFlusherLocked() { +func (ddb *datadb) startBackgroundWorkers() { + // Start file parts mergers, so they could start merging unmerged parts if needed. + // There is no need in starting in-memory parts mergers, since there are no in-memory parts yet. + ddb.startSmallPartsMergers() + ddb.startBigPartsMergers() + + ddb.startInmemoryPartsFlusher() +} + +var ( + inmemoryPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) + smallPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) + bigPartsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()) +) + +func (ddb *datadb) startSmallPartsMergers() { + ddb.partsLock.Lock() + for i := 0; i < cap(smallPartsConcurrencyCh); i++ { + ddb.startSmallPartsMergerLocked() + } + ddb.partsLock.Unlock() +} + +func (ddb *datadb) startBigPartsMergers() { + ddb.partsLock.Lock() + for i := 0; i < cap(bigPartsConcurrencyCh); i++ { + ddb.startBigPartsMergerLocked() + } + ddb.partsLock.Unlock() +} + +func (ddb *datadb) startInmemoryPartsMergerLocked() { if needStop(ddb.stopCh) { return } - maxWorkers := getMergeWorkersCount() - if ddb.oldInmemoryPartsFlushersCount >= maxWorkers { - return - } - ddb.oldInmemoryPartsFlushersCount++ ddb.wg.Add(1) go func() { - ddb.flushOldInmemoryParts() + ddb.inmemoryPartsMerger() ddb.wg.Done() }() } -func (ddb *datadb) flushOldInmemoryParts() { - ticker := time.NewTicker(time.Second) +func (ddb *datadb) startSmallPartsMergerLocked() { + if needStop(ddb.stopCh) { + return + } + ddb.wg.Add(1) + go func() { + ddb.smallPartsMerger() + ddb.wg.Done() + }() +} + +func (ddb *datadb) startBigPartsMergerLocked() { + if needStop(ddb.stopCh) { + return + } + ddb.wg.Add(1) + go func() { + ddb.bigPartsMerger() + ddb.wg.Done() + }() +} + +func (ddb *datadb) startInmemoryPartsFlusher() { + ddb.wg.Add(1) + go func() { + ddb.inmemoryPartsFlusher() + ddb.wg.Done() + }() +} + +func (ddb *datadb) inmemoryPartsFlusher() { + // Do not add jitter to d in order to guarantee the flush interval + ticker := time.NewTicker(dataFlushInterval) defer ticker.Stop() - var parts, partsToMerge []*partWrapper - - for !needStop(ddb.stopCh) { - ddb.partsLock.Lock() - parts = appendNotInMergePartsLocked(parts[:0], ddb.inmemoryParts) - currentTime := time.Now() - partsToFlush := parts[:0] - for _, pw := range parts { - if pw.flushDeadline.Before(currentTime) { - partsToFlush = append(partsToFlush, pw) - } - } - // Do not take into account available disk space when flushing in-memory parts to disk, - // since otherwise the outdated in-memory parts may remain in-memory, which, in turn, - // may result in increased memory usage plus possible loss of historical data. - // It is better to crash on out of disk error in this case. - partsToMerge = appendPartsToMerge(partsToMerge[:0], partsToFlush, math.MaxUint64) - if len(partsToMerge) == 0 { - partsToMerge = append(partsToMerge[:0], partsToFlush...) - } - setInMergeLocked(partsToMerge) - needStop := false - if len(ddb.inmemoryParts) == 0 { - // There are no in-memory parts, so stop the flusher. - needStop = true - ddb.oldInmemoryPartsFlushersCount-- - } - ddb.partsLock.Unlock() - - if needStop { - return - } - - ddb.mustMergeParts(partsToMerge, true) - if len(partsToMerge) < len(partsToFlush) { - // Continue merging remaining old in-memory parts from partsToFlush list. - continue - } - - // There are no old in-memory parts to flush. Sleep for a while until these parts appear. + for { select { case <-ddb.stopCh: return case <-ticker.C: + ddb.mustFlushInmemoryPartsToFiles(false) } } } -// startMergeWorkerLocked starts a merge worker. +func (ddb *datadb) mustFlushInmemoryPartsToFiles(isFinal bool) { + currentTime := time.Now() + var pws []*partWrapper + + ddb.partsLock.Lock() + for _, pw := range ddb.inmemoryParts { + if !pw.isInMerge && (isFinal || pw.flushDeadline.Before(currentTime)) { + pw.isInMerge = true + pws = append(pws, pw) + } + } + ddb.partsLock.Unlock() + + ddb.mustMergePartsToFiles(pws) +} + +func (ddb *datadb) mustMergePartsToFiles(pws []*partWrapper) { + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + inmemoryPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-inmemoryPartsConcurrencyCh + wg.Done() + }() + + ddb.mustMergeParts(pwsChunk, true) + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) +} + +// getPartsForOptimalMerge returns parts from pws for optimal merge, plus the remaining parts. // -// This function must be called under locked partsLock. -func (ddb *datadb) startMergeWorkerLocked() { - if needStop(ddb.stopCh) { - return +// the pws items are replaced by nil after the call. This is needed for helping Go GC to reclaim the referenced items. +func getPartsForOptimalMerge(pws []*partWrapper) ([]*partWrapper, []*partWrapper) { + pwsToMerge := appendPartsToMerge(nil, pws, math.MaxUint64) + if len(pwsToMerge) == 0 { + return pws, nil } - maxWorkers := getMergeWorkersCount() - if ddb.mergeWorkersCount >= maxWorkers { - return + + m := partsToMap(pwsToMerge) + pwsRemaining := make([]*partWrapper, 0, len(pws)-len(pwsToMerge)) + for _, pw := range pws { + if _, ok := m[pw]; !ok { + pwsRemaining = append(pwsRemaining, pw) + } } - ddb.mergeWorkersCount++ - ddb.wg.Add(1) - go func() { - globalMergeLimitCh <- struct{}{} - ddb.mustMergeExistingParts() - <-globalMergeLimitCh - ddb.wg.Done() - }() + + // Clear references to pws items, so they could be reclaimed faster by Go GC. + for i := range pws { + pws[i] = nil + } + + return pwsToMerge, pwsRemaining } -// globalMergeLimitCh limits the number of concurrent merges across all the partitions -var globalMergeLimitCh = make(chan struct{}, getMergeWorkersCount()) - -func getMergeWorkersCount() int { - n := cgroup.AvailableCPUs() - if n < 4 { - // Use bigger number of workers on systems with small number of CPU cores, - // since a single worker may become busy for long time when merging big parts. - // Then the remaining workers may continue performing merges - // for newly added small parts. - return 4 +func getWaitGroup() *sync.WaitGroup { + v := wgPool.Get() + if v == nil { + return &sync.WaitGroup{} } - return n + return v.(*sync.WaitGroup) } -func (ddb *datadb) mustMergeExistingParts() { - for !needStop(ddb.stopCh) { - maxOutBytes := availableDiskSpace(ddb.path) +func putWaitGroup(wg *sync.WaitGroup) { + wgPool.Put(wg) +} + +var wgPool sync.Pool + +func (ddb *datadb) inmemoryPartsMerger() { + for { + if needStop(ddb.stopCh) { + return + } + maxOutBytes := ddb.getMaxBigPartSize() ddb.partsLock.Lock() - parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)+len(ddb.fileParts)) - parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) - parts = appendNotInMergePartsLocked(parts, ddb.fileParts) - pws := appendPartsToMerge(nil, parts, maxOutBytes) - setInMergeLocked(pws) - if len(pws) == 0 { - ddb.mergeWorkersCount-- - } + pws := getPartsToMergeLocked(ddb.inmemoryParts, maxOutBytes) ddb.partsLock.Unlock() if len(pws) == 0 { - // Nothing to merge at the moment. + // Nothing to merge return } + inmemoryPartsConcurrencyCh <- struct{}{} ddb.mustMergeParts(pws, false) + <-inmemoryPartsConcurrencyCh } } -// appendNotInMergePartsLocked appends src parts with isInMerge=false to dst and returns the result. +func (ddb *datadb) smallPartsMerger() { + for { + if needStop(ddb.stopCh) { + return + } + maxOutBytes := ddb.getMaxBigPartSize() + + ddb.partsLock.Lock() + pws := getPartsToMergeLocked(ddb.smallParts, maxOutBytes) + ddb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + smallPartsConcurrencyCh <- struct{}{} + ddb.mustMergeParts(pws, false) + <-smallPartsConcurrencyCh + } +} + +func (ddb *datadb) bigPartsMerger() { + for { + if needStop(ddb.stopCh) { + return + } + maxOutBytes := ddb.getMaxBigPartSize() + + ddb.partsLock.Lock() + pws := getPartsToMergeLocked(ddb.bigParts, maxOutBytes) + ddb.partsLock.Unlock() + + if len(pws) == 0 { + // Nothing to merge + return + } + + bigPartsConcurrencyCh <- struct{}{} + ddb.mustMergeParts(pws, false) + <-bigPartsConcurrencyCh + } +} + +// getPartsToMergeLocked returns optimal parts to merge from pws. // -// This function must be called under partsLock. -func appendNotInMergePartsLocked(dst, src []*partWrapper) []*partWrapper { - for _, pw := range src { +// The summary size of the returned parts must be smaller than maxOutBytes. +func getPartsToMergeLocked(pws []*partWrapper, maxOutBytes uint64) []*partWrapper { + pwsRemaining := make([]*partWrapper, 0, len(pws)) + for _, pw := range pws { if !pw.isInMerge { - dst = append(dst, pw) + pwsRemaining = append(pwsRemaining, pw) } } - return dst -} -// setInMergeLocked sets isInMerge flag for pws. -// -// This function must be called under partsLock. -func setInMergeLocked(pws []*partWrapper) { - for _, pw := range pws { + pwsToMerge := appendPartsToMerge(nil, pwsRemaining, maxOutBytes) + + for _, pw := range pwsToMerge { if pw.isInMerge { - logger.Panicf("BUG: partWrapper.isInMerge unexpectedly set to true") + logger.Panicf("BUG: partWrapper.isInMerge cannot be set") } pw.isInMerge = true } + + return pwsToMerge } func assertIsInMerge(pws []*partWrapper) { @@ -370,7 +474,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { startTime := time.Now() dstPartType := ddb.getDstPartType(pws, isFinal) - if dstPartType == partFile { + if dstPartType != partInmemory { // Make sure there is enough disk space for performing the merge partsSize := getCompressedSize(pws) needReleaseDiskSpace := tryReserveDiskSpace(ddb.path, partsSize) @@ -387,14 +491,21 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { } } - if dstPartType == partInmemory { + switch dstPartType { + case partInmemory: ddb.inmemoryMergesTotal.Add(1) ddb.inmemoryActiveMerges.Add(1) defer ddb.inmemoryActiveMerges.Add(-1) - } else { - ddb.fileMergesTotal.Add(1) - ddb.fileActiveMerges.Add(1) - defer ddb.fileActiveMerges.Add(-1) + case partSmall: + ddb.smallPartMergesTotal.Add(1) + ddb.smallPartActiveMerges.Add(1) + defer ddb.smallPartActiveMerges.Add(-1) + case partBig: + ddb.bigPartMergesTotal.Add(1) + ddb.bigPartActiveMerges.Add(1) + defer ddb.bigPartActiveMerges.Add(-1) + default: + logger.Panicf("BUG: unknown partType=%d", dstPartType) } // Initialize destination paths. @@ -428,7 +539,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { mpNew = getInmemoryPart() bsw.MustInitForInmemoryPart(mpNew) } else { - nocache := !shouldUsePageCacheForPartSize(srcSize) + nocache := dstPartType == partBig bsw.MustInitForFilePart(dstPartPath, nocache) } @@ -455,7 +566,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { } if needStop(stopCh) { // Remove incomplete destination part - if dstPartType == partFile { + if dstPartType != partInmemory { fs.MustRemoveAll(dstPartPath) } return @@ -477,7 +588,7 @@ func (ddb *datadb) mustMergeParts(pws []*partWrapper, isFinal bool) { ddb.swapSrcWithDstParts(pws, pwNew, dstPartType) d := time.Since(startTime) - if d <= 30*time.Second { + if d <= time.Minute { return } @@ -496,21 +607,22 @@ type partType int var ( partInmemory = partType(0) - partFile = partType(1) + partSmall = partType(1) + partBig = partType(2) ) func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType { - if isFinal { - return partFile - } dstPartSize := getCompressedSize(pws) - if dstPartSize > getMaxInmemoryPartSize() { - return partFile + if dstPartSize > ddb.getMaxSmallPartSize() { + return partBig + } + if isFinal || dstPartSize > getMaxInmemoryPartSize() { + return partSmall } if !areAllInmemoryParts(pws) { // If at least a single source part is located in file, // then the destination part must be in file for durability reasons. - return partFile + return partSmall } return partInmemory } @@ -560,45 +672,8 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { ddb.partsLock.Lock() ddb.inmemoryParts = append(ddb.inmemoryParts, pw) - ddb.startOldInmemoryPartsFlusherLocked() - if len(ddb.inmemoryParts) > defaultPartsToMerge { - ddb.startMergeWorkerLocked() - } - needAssistedMerge := ddb.needAssistedMergeForInmemoryPartsLocked() + ddb.startInmemoryPartsMergerLocked() ddb.partsLock.Unlock() - - if needAssistedMerge { - ddb.assistedMergeForInmemoryParts() - } -} - -func (ddb *datadb) needAssistedMergeForInmemoryPartsLocked() bool { - if len(ddb.inmemoryParts) < maxInmemoryPartsPerPartition { - return false - } - n := 0 - for _, pw := range ddb.inmemoryParts { - if !pw.isInMerge { - n++ - } - } - return n >= defaultPartsToMerge -} - -func (ddb *datadb) assistedMergeForInmemoryParts() { - ddb.partsLock.Lock() - parts := make([]*partWrapper, 0, len(ddb.inmemoryParts)) - parts = appendNotInMergePartsLocked(parts, ddb.inmemoryParts) - // Do not take into account available disk space when merging in-memory parts, - // since otherwise the outdated in-memory parts may remain in-memory, which, in turn, - // may result in increased memory usage plus possible loss of historical data. - // It is better to crash on out of disk error in this case. - pws := make([]*partWrapper, 0, len(parts)) - pws = appendPartsToMerge(pws[:0], parts, math.MaxUint64) - setInMergeLocked(pws) - ddb.partsLock.Unlock() - - ddb.mustMergeParts(pws, false) } // DatadbStats contains various stats for datadb. @@ -609,41 +684,62 @@ type DatadbStats struct { // InmemoryActiveMerges is the number of currently active inmemory merges performed by the given datadb. InmemoryActiveMerges uint64 - // FileMergesTotal is the number of file merges performed in the given datadb. - FileMergesTotal uint64 + // SmallPartMergesTotal is the number of small file merges performed in the given datadb. + SmallPartMergesTotal uint64 - // FileActiveMerges is the number of currently active file merges performed by the given datadb. - FileActiveMerges uint64 + // SmallPartActiveMerges is the number of currently active small file merges performed by the given datadb. + SmallPartActiveMerges uint64 + + // BigPartMergesTotal is the number of big file merges performed in the given datadb. + BigPartMergesTotal uint64 + + // BigPartActiveMerges is the number of currently active big file merges performed by the given datadb. + BigPartActiveMerges uint64 // InmemoryRowsCount is the number of rows, which weren't flushed to disk yet. InmemoryRowsCount uint64 - // FileRowsCount is the number of rows stored on disk. - FileRowsCount uint64 + // SmallPartRowsCount is the number of rows stored on disk in small parts. + SmallPartRowsCount uint64 + + // BigPartRowsCount is the number of rows stored on disk in big parts. + BigPartRowsCount uint64 // InmemoryParts is the number of in-memory parts, which weren't flushed to disk yet. InmemoryParts uint64 - // FileParts is the number of file-based parts stored on disk. - FileParts uint64 + // SmallParts is the number of file-based small parts stored on disk. + SmallParts uint64 + + // BigParts is the number of file-based big parts stored on disk. + BigParts uint64 // InmemoryBlocks is the number of in-memory blocks, which weren't flushed to disk yet. InmemoryBlocks uint64 - // FileBlocks is the number of file-based blocks stored on disk. - FileBlocks uint64 + // SmallPartBlocks is the number of file-based small blocks stored on disk. + SmallPartBlocks uint64 + + // BigPartBlocks is the number of file-based big blocks stored on disk. + BigPartBlocks uint64 // CompressedInmemorySize is the size of compressed data stored in memory. CompressedInmemorySize uint64 - // CompressedFileSize is the size of compressed data stored on disk. - CompressedFileSize uint64 + // CompressedSmallPartSize is the size of compressed small parts data stored on disk. + CompressedSmallPartSize uint64 + + // CompressedBigPartSize is the size of compressed big data stored on disk. + CompressedBigPartSize uint64 // UncompressedInmemorySize is the size of uncompressed data stored in memory. UncompressedInmemorySize uint64 - // UncompressedFileSize is the size of uncompressed data stored on disk. - UncompressedFileSize uint64 + // UncompressedSmallPartSize is the size of uncompressed small data stored on disk. + UncompressedSmallPartSize uint64 + + // UncompressedBigPartSize is the size of uncompressed big data stored on disk. + UncompressedBigPartSize uint64 } func (s *DatadbStats) reset() { @@ -652,32 +748,39 @@ func (s *DatadbStats) reset() { // RowsCount returns the number of rows stored in datadb. func (s *DatadbStats) RowsCount() uint64 { - return s.InmemoryRowsCount + s.FileRowsCount + return s.InmemoryRowsCount + s.SmallPartRowsCount + s.BigPartRowsCount } -// updateStats updates s with ddb stats +// updateStats updates s with ddb stats. func (ddb *datadb) updateStats(s *DatadbStats) { s.InmemoryMergesTotal += ddb.inmemoryMergesTotal.Load() s.InmemoryActiveMerges += uint64(ddb.inmemoryActiveMerges.Load()) - s.FileMergesTotal += ddb.fileMergesTotal.Load() - s.FileActiveMerges += uint64(ddb.fileActiveMerges.Load()) + s.SmallPartMergesTotal += ddb.smallPartMergesTotal.Load() + s.SmallPartActiveMerges += uint64(ddb.smallPartActiveMerges.Load()) + s.BigPartMergesTotal += ddb.bigPartMergesTotal.Load() + s.BigPartActiveMerges += uint64(ddb.bigPartActiveMerges.Load()) ddb.partsLock.Lock() s.InmemoryRowsCount += getRowsCount(ddb.inmemoryParts) - s.FileRowsCount += getRowsCount(ddb.fileParts) + s.SmallPartRowsCount += getRowsCount(ddb.smallParts) + s.BigPartRowsCount += getRowsCount(ddb.bigParts) s.InmemoryParts += uint64(len(ddb.inmemoryParts)) - s.FileParts += uint64(len(ddb.fileParts)) + s.SmallParts += uint64(len(ddb.smallParts)) + s.BigParts += uint64(len(ddb.bigParts)) s.InmemoryBlocks += getBlocksCount(ddb.inmemoryParts) - s.FileBlocks += getBlocksCount(ddb.fileParts) + s.SmallPartBlocks += getBlocksCount(ddb.smallParts) + s.BigPartBlocks += getBlocksCount(ddb.bigParts) s.CompressedInmemorySize += getCompressedSize(ddb.inmemoryParts) - s.CompressedFileSize += getCompressedSize(ddb.fileParts) + s.CompressedSmallPartSize += getCompressedSize(ddb.smallParts) + s.CompressedBigPartSize += getCompressedSize(ddb.bigParts) s.UncompressedInmemorySize += getUncompressedSize(ddb.inmemoryParts) - s.UncompressedFileSize += getUncompressedSize(ddb.fileParts) + s.UncompressedSmallPartSize += getUncompressedSize(ddb.smallParts) + s.UncompressedBigPartSize += getUncompressedSize(ddb.bigParts) ddb.partsLock.Unlock() } @@ -687,29 +790,56 @@ func (ddb *datadb) debugFlush() { // Nothing to do, since all the ingested data is available for search via ddb.inmemoryParts. } -func (ddb *datadb) mustFlushInmemoryPartsToDisk() { +func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { + // Atomically unregister old parts and add new part to pt. + partsToRemove := partsToMap(pws) + + removedInmemoryParts := 0 + removedSmallParts := 0 + removedBigParts := 0 + ddb.partsLock.Lock() - pws := append([]*partWrapper{}, ddb.inmemoryParts...) - setInMergeLocked(pws) + + ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove) + ddb.smallParts, removedSmallParts = removeParts(ddb.smallParts, partsToRemove) + ddb.bigParts, removedBigParts = removeParts(ddb.bigParts, partsToRemove) + + if pwNew != nil { + switch dstPartType { + case partInmemory: + ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew) + ddb.startInmemoryPartsMergerLocked() + case partSmall: + ddb.smallParts = append(ddb.smallParts, pwNew) + ddb.startSmallPartsMergerLocked() + case partBig: + ddb.bigParts = append(ddb.bigParts, pwNew) + ddb.startBigPartsMergerLocked() + default: + logger.Panicf("BUG: unknown partType=%d", dstPartType) + } + } + + // Atomically store the updated list of file-based parts on disk. + // This must be performed under partsLock in order to prevent from races + // when multiple concurrently running goroutines update the list. + if removedSmallParts > 0 || removedBigParts > 0 || pwNew != nil && dstPartType != partInmemory { + smallPartNames := getPartNames(ddb.smallParts) + bigPartNames := getPartNames(ddb.bigParts) + mustWritePartNames(ddb.path, smallPartNames, bigPartNames) + } + ddb.partsLock.Unlock() - var pwsChunk []*partWrapper - for len(pws) > 0 { - // Do not take into account available disk space when performing the final flush of in-memory parts to disk, - // since otherwise these parts will be lost. - // It is better to crash on out of disk error in this case. - pwsChunk = appendPartsToMerge(pwsChunk[:0], pws, math.MaxUint64) - if len(pwsChunk) == 0 { - pwsChunk = append(pwsChunk[:0], pws...) - } - partsToRemove := partsToMap(pwsChunk) - removedParts := 0 - pws, removedParts = removeParts(pws, partsToRemove) - if removedParts != len(pwsChunk) { - logger.Panicf("BUG: unexpected number of parts removed; got %d; want %d", removedParts, len(pwsChunk)) - } + removedParts := removedInmemoryParts + removedSmallParts + removedBigParts + if removedParts != len(partsToRemove) { + logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove)) + } - ddb.mustMergeParts(pwsChunk, true) + // Mark old parts as must be deleted and decrement reference count, so they are eventually closed and deleted. + for _, pw := range pws { + pw.mustDrop.Store(true) + pw.decRef() } } @@ -724,54 +854,6 @@ func partsToMap(pws []*partWrapper) map[*partWrapper]struct{} { return m } -func (ddb *datadb) swapSrcWithDstParts(pws []*partWrapper, pwNew *partWrapper, dstPartType partType) { - // Atomically unregister old parts and add new part to pt. - partsToRemove := partsToMap(pws) - removedInmemoryParts := 0 - removedFileParts := 0 - - ddb.partsLock.Lock() - - ddb.inmemoryParts, removedInmemoryParts = removeParts(ddb.inmemoryParts, partsToRemove) - ddb.fileParts, removedFileParts = removeParts(ddb.fileParts, partsToRemove) - if pwNew != nil { - switch dstPartType { - case partInmemory: - ddb.inmemoryParts = append(ddb.inmemoryParts, pwNew) - ddb.startOldInmemoryPartsFlusherLocked() - case partFile: - ddb.fileParts = append(ddb.fileParts, pwNew) - default: - logger.Panicf("BUG: unknown partType=%d", dstPartType) - } - if len(ddb.inmemoryParts)+len(ddb.fileParts) > defaultPartsToMerge { - ddb.startMergeWorkerLocked() - } - } - - // Atomically store the updated list of file-based parts on disk. - // This must be performed under partsLock in order to prevent from races - // when multiple concurrently running goroutines update the list. - if removedFileParts > 0 || pwNew != nil && dstPartType == partFile { - partNames := getPartNames(ddb.fileParts) - mustWritePartNames(ddb.path, partNames) - } - - ddb.partsLock.Unlock() - - removedParts := removedInmemoryParts + removedFileParts - if removedParts != len(partsToRemove) { - logger.Panicf("BUG: unexpected number of parts removed; got %d, want %d", removedParts, len(partsToRemove)) - } - - // Mark old parts as must be deleted and decrement reference count, - // so they are eventually closed and deleted. - for _, pw := range pws { - pw.mustDrop.Store(true) - pw.decRef() - } -} - func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]struct{}) ([]*partWrapper, int) { dst := pws[:0] for _, pw := range pws { @@ -853,6 +935,34 @@ func (ddb *datadb) releasePartsToMerge(pws []*partWrapper) { ddb.partsLock.Unlock() } +func (ddb *datadb) getMaxBigPartSize() uint64 { + return getMaxOutBytes(ddb.path) +} + +func (ddb *datadb) getMaxSmallPartSize() uint64 { + // Small parts are cached in the OS page cache, + // so limit their size by the remaining free RAM. + mem := memory.Remaining() + n := uint64(mem) / defaultPartsToMerge + if n < 10e6 { + n = 10e6 + } + // Make sure the output part fits available disk space for small parts. + sizeLimit := getMaxOutBytes(ddb.path) + if n > sizeLimit { + n = sizeLimit + } + return n +} + +func getMaxOutBytes(path string) uint64 { + n := availableDiskSpace(path) + if n > maxBigPartSize { + n = maxBigPartSize + } + return n +} + func availableDiskSpace(path string) uint64 { available := fs.MustGetFreeSpace(path) reserved := reservedDiskSpace.Load() @@ -865,7 +975,7 @@ func availableDiskSpace(path string) uint64 { func tryReserveDiskSpace(path string, n uint64) bool { available := fs.MustGetFreeSpace(path) reserved := reserveDiskSpace(n) - if available > reserved { + if available >= reserved { return true } releaseDiskSpace(n) @@ -908,20 +1018,29 @@ func mustCloseDatadb(ddb *datadb) { ddb.wg.Wait() // flush in-memory data to disk - ddb.mustFlushInmemoryPartsToDisk() + ddb.mustFlushInmemoryPartsToFiles(true) if len(ddb.inmemoryParts) > 0 { logger.Panicf("BUG: the number of in-memory parts must be zero after flushing them to disk; got %d", len(ddb.inmemoryParts)) } ddb.inmemoryParts = nil - // close file parts - for _, pw := range ddb.fileParts { + // close small parts + for _, pw := range ddb.smallParts { pw.decRef() if n := pw.refCount.Load(); n != 0 { - logger.Panicf("BUG: ther are %d references to filePart", n) + logger.Panicf("BUG: there are %d references to smallPart", n) } } - ddb.fileParts = nil + ddb.smallParts = nil + + // close big parts + for _, pw := range ddb.bigParts { + pw.decRef() + if n := pw.refCount.Load(); n != 0 { + logger.Panicf("BUG: there are %d references to bigPart", n) + } + } + ddb.bigParts = nil ddb.path = "" ddb.pt = nil @@ -941,7 +1060,9 @@ func getPartNames(pws []*partWrapper) []string { return partNames } -func mustWritePartNames(path string, partNames []string) { +func mustWritePartNames(path string, smallPartNames, bigPartNames []string) { + partNames := append([]string{}, smallPartNames...) + partNames = append(partNames, bigPartNames...) data, err := json.Marshal(partNames) if err != nil { logger.Panicf("BUG: cannot marshal partNames to JSON: %s", err) @@ -1102,8 +1223,3 @@ func getBlocksCount(pws []*partWrapper) uint64 { } return n } - -func shouldUsePageCacheForPartSize(size uint64) bool { - mem := memory.Remaining() / defaultPartsToMerge - return size <= uint64(mem) -} diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 9782ba584..b1c9c3e56 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -26,7 +26,7 @@ type LogRows struct { // timestamps holds stimestamps for rows added to LogRows timestamps []int64 - // rows holds fields for rows atted to LogRows. + // rows holds fields for rows added to LogRows. rows [][]Field // sf is a helper for sorting fields in every added row diff --git a/lib/logstorage/partition_test.go b/lib/logstorage/partition_test.go index ea1af27a5..56c576056 100644 --- a/lib/logstorage/partition_test.go +++ b/lib/logstorage/partition_test.go @@ -26,14 +26,20 @@ func TestPartitionLifecycle(t *testing.T) { if ddbStats.InmemoryParts != 0 { t.Fatalf("unexpected non-zero number of in-memory parts in empty partition: %d", ddbStats.InmemoryParts) } - if ddbStats.FileParts != 0 { - t.Fatalf("unexpected non-zero number of file parts in empty partition: %d", ddbStats.FileParts) + if ddbStats.SmallParts != 0 { + t.Fatalf("unexpected non-zero number of small file parts in empty partition: %d", ddbStats.SmallParts) + } + if ddbStats.BigParts != 0 { + t.Fatalf("unexpected non-zero number of big file parts in empty partition: %d", ddbStats.BigParts) } if ddbStats.CompressedInmemorySize != 0 { t.Fatalf("unexpected non-zero size of inmemory parts for empty partition") } - if ddbStats.CompressedFileSize != 0 { - t.Fatalf("unexpected non-zero size of file parts for empty partition") + if ddbStats.CompressedSmallPartSize != 0 { + t.Fatalf("unexpected non-zero size of small file parts for empty partition") + } + if ddbStats.CompressedBigPartSize != 0 { + t.Fatalf("unexpected non-zero size of big file parts for empty partition") } time.Sleep(10 * time.Millisecond) mustClosePartition(pt) @@ -87,8 +93,8 @@ func TestPartitionMustAddRowsSerial(t *testing.T) { if ddbStats.InmemoryParts != 0 { t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts) } - if ddbStats.FileParts == 0 { - t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition") + if ddbStats.SmallParts+ddbStats.BigParts == 0 { + t.Fatalf("the number of small parts must be greater than 0 after re-opening the partition") } // Try adding entries for multiple streams at a time @@ -115,7 +121,7 @@ func TestPartitionMustAddRowsSerial(t *testing.T) { if ddbStats.InmemoryParts != 0 { t.Fatalf("unexpected non-zero number of in-memory parts after re-opening the partition: %d", ddbStats.InmemoryParts) } - if ddbStats.FileParts == 0 { + if ddbStats.SmallParts+ddbStats.BigParts == 0 { t.Fatalf("the number of file parts must be greater than 0 after re-opening the partition") } diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 2f4979ae7..8c312d3e1 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -400,7 +400,8 @@ func initStreamFiltersList(tenantIDs []TenantID, idb *indexdb, filters []filter) func (ddb *datadb) search(so *searchOptions, workCh chan<- []*blockSearchWork, stopCh <-chan struct{}) partitionSearchFinalizer { // Select parts with data for the given time range ddb.partsLock.Lock() - pws := appendPartsInTimeRange(nil, ddb.fileParts, so.minTimestamp, so.maxTimestamp) + pws := appendPartsInTimeRange(nil, ddb.bigParts, so.minTimestamp, so.maxTimestamp) + pws = appendPartsInTimeRange(pws, ddb.smallParts, so.minTimestamp, so.maxTimestamp) pws = appendPartsInTimeRange(pws, ddb.inmemoryParts, so.minTimestamp, so.maxTimestamp) // Increase references to the searched parts, so they aren't deleted during search. diff --git a/lib/logstorage/tokenizer_test.go b/lib/logstorage/tokenizer_test.go index 37573752f..b7b8f7acc 100644 --- a/lib/logstorage/tokenizer_test.go +++ b/lib/logstorage/tokenizer_test.go @@ -24,6 +24,6 @@ Apr 28 13:43:38 localhost whoopsie[2812]: [13:43:38] online Apr 28 13:45:01 localhost CRON[12181]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1) Apr 28 13:48:01 localhost kernel: [36020.497806] CPU0: Core temperature above threshold, cpu clock throttled (total events = 22034) `, "\n"), []string{"Apr", "28", "13", "43", "38", "localhost", "whoopsie", "2812", "online", "45", "01", "CRON", "12181", - "root", "CMD", "command", "v", "debian", "sa1", "dev", "null", "1", "48", "kernel", "36020", "497806", "CPU0", "Core", - "temperature", "above", "threshold", "cpu", "clock", "throttled", "total", "events", "22034"}) + "root", "CMD", "command", "v", "debian", "sa1", "dev", "null", "1", "48", "kernel", "36020", "497806", "CPU0", "Core", + "temperature", "above", "threshold", "cpu", "clock", "throttled", "total", "events", "22034"}) } diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index e80b1a309..691e636bd 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -65,11 +65,8 @@ type valuesEncoder struct { func (ve *valuesEncoder) reset() { ve.buf = ve.buf[:0] - vs := ve.values - for i := range vs { - vs[i] = "" - } - ve.values = vs[:0] + clear(ve.values) + ve.values = ve.values[:0] } // encode encodes values to ve.values and returns the encoded value type with min/max encoded values. @@ -1073,11 +1070,8 @@ type valuesDict struct { } func (vd *valuesDict) reset() { - vs := vd.values - for i := range vs { - vs[i] = "" - } - vd.values = vs[:0] + clear(vd.values) + vd.values = vd.values[:0] } func (vd *valuesDict) copyFrom(src *valuesDict) { @@ -1134,7 +1128,7 @@ func (vd *valuesDict) unmarshal(src []byte) ([]byte, error) { return srcOrig, fmt.Errorf("cannot umarshal value %d out of %d from dict: %w", i, dictLen, err) } src = tail - // Do not use bytesutil.InternBytes(data) here, since it works slower than the string(data) in prod + v := string(data) vd.values = append(vd.values, v) }