diff --git a/app/victoria-logs/main.go b/app/victoria-logs/main.go index 32834904a..19216c428 100644 --- a/app/victoria-logs/main.go +++ b/app/victoria-logs/main.go @@ -92,6 +92,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { if vlselect.RequestHandler(w, r) { return true } + if vlstorage.RequestHandler(w, r) { + return true + } return false } diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 628919ce5..d4ac5a171 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -37,6 +37,8 @@ var ( "see https://docs.victoriametrics.com/victorialogs/data-ingestion/ ; see also -logNewStreams") minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which "+ "the storage stops accepting new data") + + forceMergeAuthKey = flagutil.NewPassword("forceMergeAuthKey", "authKey, which must be passed in query string to /internal/force_merge pages. It overrides -httpAuth.*") ) // Init initializes vlstorage. @@ -87,6 +89,28 @@ func Stop() { strg = nil } +// RequestHandler is a storage request handler. +func RequestHandler(w http.ResponseWriter, r *http.Request) bool { + path := r.URL.Path + if path == "/internal/force_merge" { + if !httpserver.CheckAuthFlag(w, r, forceMergeAuthKey) { + return true + } + // Run force merge in background + partitionNamePrefix := r.FormValue("partition_prefix") + go func() { + activeForceMerges.Inc() + defer activeForceMerges.Dec() + logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix) + startTime := time.Now() + strg.MustForceMerge(partitionNamePrefix) + logger.Infof("forced merge for partition_prefix=%q has been successfully finished in %.3f seconds", partitionNamePrefix, time.Since(startTime).Seconds()) + }() + return true + } + return false +} + var strg *logstorage.Storage var storageMetrics *metrics.Set @@ -205,3 +229,5 @@ func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_big_timestamp"}`, ss.RowsDroppedTooBigTimestamp) metrics.WriteCounterUint64(w, `vl_rows_dropped_total{reason="too_small_timestamp"}`, ss.RowsDroppedTooSmallTimestamp) } + +var activeForceMerges = metrics.NewCounter("vl_active_force_merges") diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 3a32b9bc2..cfedbe96f 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -15,6 +15,8 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +* FEATURE: add support for forced merge. See [these docs](https://docs.victoriametrics.com/victorialogs/#forced-merge). + * BUGFIX: avoid possible panic when logs for a new day are ingested during execution of concurrent queries. * BUGFIX: avoid panic at `lib/logstorage.(*blockResultColumn).forEachDictValue()` when [stats with additional filters](https://docs.victoriametrics.com/victorialogs/logsql/#stats-with-additional-filters). The panic has been introduced in [v0.33.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.33.0-victorialogs) in [this commit](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/a350be48b68330ee1a487e1fb09b002d3be45163). diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index c9c5ad722..4bfe7762f 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -131,6 +131,19 @@ For example, the following command starts VictoriaLogs, which stores the data at VictoriaLogs automatically creates the `-storageDataPath` directory on the first run if it is missing. +## Forced merge + +VictoriaLogs performs data compactions in background in order to keep good performance characteristics when accepting new data. +These compactions (merges) are performed independently on per-day partitions. +This means that compactions are stopped for per-day partitions if no new data is ingested into these partitions. +Sometimes it is necessary to trigger compactions for old partitions. In this case forced compaction may be initiated on the specified per-month partition +by sending request to `/internal/force_merge?partition_prefix=YYYYMMDD`, +where `YYYYMMDD` is per-day partition name. For example, `http://victoria-logs:9428/internal/force_merge?partition_prefix=20240921` would initiate forced +merge for September 21, 2024 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merge continues running in background. + +Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions, +since VictoriaLogs automatically performs optimal merges in background when new data is ingested into it. + ## High Availability ### High Availability (HA) Setup with VictoriaLogs Single-Node Instances diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index 5d559bf43..99a1d6f7a 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -459,6 +459,7 @@ func assertIsInMerge(pws []*partWrapper) { // mustMergeParts merges pws to a single resulting part. // // if isFinal is set, then the resulting part is guaranteed to be saved to disk. +// if isFinal is set, then the merge process cannot be interrupted. // The pws may remain unmerged after returning from the function if there is no enough disk space. // // All the parts inside pws must have isInMerge field set to true. @@ -1226,3 +1227,48 @@ func getBlocksCount(pws []*partWrapper) uint64 { } return n } + +func (ddb *datadb) mustForceMergeAllParts() { + // Flush inmemory parts to files before forced merge + ddb.mustFlushInmemoryPartsToFiles(true) + + var pws []*partWrapper + + // Collect all the file parts for forced merge + ddb.partsLock.Lock() + pws = appendAllPartsForMergeLocked(pws, ddb.smallParts) + pws = appendAllPartsForMergeLocked(pws, ddb.bigParts) + ddb.partsLock.Unlock() + + // If len(pws) == 1, then the merge must run anyway. + // This allows applying the configured retention, removing the deleted data, etc. + + // Merge pws optimally + wg := getWaitGroup() + for len(pws) > 0 { + pwsToMerge, pwsRemaining := getPartsForOptimalMerge(pws) + wg.Add(1) + bigPartsConcurrencyCh <- struct{}{} + go func(pwsChunk []*partWrapper) { + defer func() { + <-bigPartsConcurrencyCh + wg.Done() + }() + + ddb.mustMergeParts(pwsChunk, false) + }(pwsToMerge) + pws = pwsRemaining + } + wg.Wait() + putWaitGroup(wg) +} + +func appendAllPartsForMergeLocked(dst, src []*partWrapper) []*partWrapper { + for _, pw := range src { + if !pw.isInMerge { + pw.isInMerge = true + dst = append(dst, pw) + } + } + return dst +} diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index fc88510ec..439ca002f 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -195,3 +195,8 @@ func (pt *partition) updateStats(ps *PartitionStats) { pt.ddb.updateStats(&ps.DatadbStats) pt.idb.updateStats(&ps.IndexdbStats) } + +// mustForceMerge runs forced merge for all the parts in pt. +func (pt *partition) mustForceMerge() { + pt.ddb.mustForceMergeAllParts() +} diff --git a/lib/logstorage/storage.go b/lib/logstorage/storage.go index b729ffe30..a448bacc4 100644 --- a/lib/logstorage/storage.go +++ b/lib/logstorage/storage.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -113,6 +114,8 @@ type Storage struct { // partitions is a list of partitions for the Storage. // // It must be accessed under partitionsLock. + // + // partitions are sorted by time. partitions []*partitionWrapper // ptwHot is the "hot" partition, were the last rows were ingested. @@ -472,6 +475,33 @@ func (s *Storage) MustClose() { s.path = "" } +// MustForceMerge force-merges parts in s partitions with names starting from the given partitionNamePrefix. +// +// Partitions are merged sequentially in order to reduce load on the system. +func (s *Storage) MustForceMerge(partitionNamePrefix string) { + var ptws []*partitionWrapper + + s.partitionsLock.Lock() + for _, ptw := range s.partitions { + if strings.HasPrefix(ptw.pt.name, partitionNamePrefix) { + ptw.incRef() + ptws = append(ptws, ptw) + } + } + s.partitionsLock.Unlock() + + s.wg.Add(1) + defer s.wg.Done() + + for _, ptw := range ptws { + logger.Infof("started force merge for partition %s", ptw.pt.name) + startTime := time.Now() + ptw.pt.mustForceMerge() + ptw.decRef() + logger.Infof("finished force merge for partition %s in %.3fs", ptw.pt.name, time.Since(startTime).Seconds()) + } +} + // MustAddRows adds lr to s. // // It is recommended checking whether the s is in read-only mode by calling IsReadOnly()