lib/storage: add /internal/force_merge handler for running forced compactions on historical per-month partitions

This may be useful for freeing up storage space after time series deletion.

See https://victoriametrics.github.io/#force-merge for more details.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/686
This commit is contained in:
Aliaksandr Valialkin 2020-09-17 12:01:53 +03:00
parent 3abbb38254
commit d96858b921
8 changed files with 129 additions and 5 deletions

View file

@ -206,6 +206,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
be used on a regular basis, since it carries non-zero overhead. be used on a regular basis, since it carries non-zero overhead.
* `vmstorage` nodes provide the following HTTP endpoints on `8482` port: * `vmstorage` nodes provide the following HTTP endpoints on `8482` port:
- `/internal/force_merge` - initiate [forced compactions](https://victoriametrics.github.io/#force-merge) on the given `vmstorage` node.
- `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282), - `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282),
which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding
command-line flag value. command-line flag value.

View file

@ -113,6 +113,22 @@ func newRequestHandler(strg *storage.Storage) httpserver.RequestHandler {
func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storage) bool { func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storage) bool {
path := r.URL.Path path := r.URL.Path
if path == "/internal/force_merge" {
// Run force merge in background
partitionNamePrefix := r.FormValue("partition_prefix")
go func() {
activeForceMerges.Inc()
defer activeForceMerges.Dec()
logger.Infof("forced merge for partition_prefix=%q has been started", partitionNamePrefix)
startTime := time.Now()
if err := strg.ForceMergePartitions(partitionNamePrefix); err != nil {
logger.Errorf("error in forced merge for partition_prefix=%q: %s", partitionNamePrefix, err)
return
}
logger.Infof("forced merge for partition_prefix=%q has been successfully finished in %.3f seconds", partitionNamePrefix, time.Since(startTime).Seconds())
}()
return true
}
if !strings.HasPrefix(path, "/snapshot") { if !strings.HasPrefix(path, "/snapshot") {
return false return false
} }
@ -183,6 +199,8 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag
} }
} }
var activeForceMerges = metrics.NewCounter("vm_active_force_merges")
func registerStorageMetrics(strg *storage.Storage) { func registerStorageMetrics(strg *storage.Storage) {
mCache := &storage.Metrics{} mCache := &storage.Metrics{}
var mCacheLock sync.Mutex var mCacheLock sync.Mutex

View file

@ -206,6 +206,7 @@ or [an alternative dashboard for VictoriaMetrics cluster](https://grafana.com/gr
be used on a regular basis, since it carries non-zero overhead. be used on a regular basis, since it carries non-zero overhead.
* `vmstorage` nodes provide the following HTTP endpoints on `8482` port: * `vmstorage` nodes provide the following HTTP endpoints on `8482` port:
- `/internal/force_merge` - initiate [forced compactions](https://victoriametrics.github.io/#force-merge) on the given `vmstorage` node.
- `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282), - `/snapshot/create` - create [instant snapshot](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282),
which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding which can be used for backups in background. Snapshots are created in `<storageDataPath>/snapshots` folder, where `<storageDataPath>` is the corresponding
command-line flag value. command-line flag value.

View file

@ -115,6 +115,7 @@ See [features available for enterprise customers](https://github.com/VictoriaMet
* [Setting up service](#setting-up-service) * [Setting up service](#setting-up-service)
* [How to work with snapshots](#how-to-work-with-snapshots) * [How to work with snapshots](#how-to-work-with-snapshots)
* [How to delete time series](#how-to-delete-time-series) * [How to delete time series](#how-to-delete-time-series)
* [Forced merge](#forced-merge)
* [How to export time series](#how-to-export-time-series) * [How to export time series](#how-to-export-time-series)
* [How to import time series data](#how-to-import-time-series-data) * [How to import time series data](#how-to-import-time-series-data)
* [Relabeling](#relabeling) * [Relabeling](#relabeling)
@ -712,6 +713,8 @@ Send a request to `http://<victoriametrics-addr>:8428/api/v1/admin/tsdb/delete_s
where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors) where `<timeseries_selector_for_delete>` may contain any [time series selector](https://prometheus.io/docs/prometheus/latest/querying/basics/#time-series-selectors)
for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for for metrics to delete. After that all the time series matching the given selector are deleted. Storage space for
the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). the deleted time series isn't freed instantly - it is freed during subsequent [background merges of data files](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
Note that background merges may never occur for data from previous months, so storage space won't be freed for historical data.
In this case [forced merge](#forced-merge) may help freeing up storage space.
It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>` It is recommended verifying which metrics will be deleted with the call to `http://<victoria-metrics-addr>:8428/api/v1/series?match[]=<timeseries_selector_for_delete>`
before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to before actually deleting the metrics. By default this query will only scan active series in the past 5 minutes, so you may need to
@ -731,9 +734,26 @@ It isn't recommended using delete API for the following cases, since it brings n
See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details. See [this article](https://www.robustperception.io/relabelling-can-discard-targets-timeseries-and-alerts) for details.
* Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted * Reducing disk space usage by deleting unneeded time series. This doesn't work as expected, since the deleted
time series occupy disk space until the next merge operation, which can never occur when deleting too old data. time series occupy disk space until the next merge operation, which can never occur when deleting too old data.
[Forced merge](#forced-merge) may be used for freeing up disk space occupied by old data.
It is better using `-retentionPeriod` command-line flag for efficient pruning of old data. It is better using `-retentionPeriod` command-line flag for efficient pruning of old data.
### Forced merge
VictoriaMetrics performs [data compations in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
in order to keep good performance characteristics when accepting new data. These compactions (merges) are performed independently on per-month partitions.
This means that compactions are stopped for per-month partitions if no new data is ingested into these partitions.
Sometimes it is necessary to trigger compactions for old partitions. For instance, in order to free up disk space occupied by [deleted time series](#how-to-delete-time-series).
In this case forced compaction may be initiated on the specified per-month partition by sending request to `/internal/force_merge?partition_prefix=YYYY_MM`,
where `YYYY_MM` is per-month partition name. For example, `http://victoriametrics:8428/internal/force_merge?partition_prefix=2020_08` would initiate forced
merge for August 2020 partition. The call to `/internal/force_merge` returns immediately, while the corresponding forced merges continues running in background.
Forced merges may require additional CPU, disk IO and storage space resources. It is unnecessary to run forced merge under normal conditions,
since VictoriaMetrics automatically performs [optimal merges in background](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282)
when new data is ingested into it.
### How to export time series ### How to export time series
Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`, Send a request to `http://<victoriametrics-addr>:8428/api/v1/export?match[]=<timeseries_selector_for_export>`,

View file

@ -668,7 +668,7 @@ func (pt *partition) MustClose() {
} }
pt.partsLock.Unlock() pt.partsLock.Unlock()
if err := pt.mergePartsOptimal(pws); err != nil { if err := pt.mergePartsOptimal(pws, nil); err != nil {
logger.Panicf("FATAL: cannot flush %d inmemory parts to files on %q: %s", len(pws), pt.smallPartsPath, err) logger.Panicf("FATAL: cannot flush %d inmemory parts to files on %q: %s", len(pws), pt.smallPartsPath, err)
} }
logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), pt.smallPartsPath) logger.Infof("%d inmemory parts have been flushed to files in %.3f seconds on %q", len(pws), time.Since(startTime).Seconds(), pt.smallPartsPath)
@ -794,13 +794,13 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p
} }
pt.partsLock.Unlock() pt.partsLock.Unlock()
if err := pt.mergePartsOptimal(dstPws); err != nil { if err := pt.mergePartsOptimal(dstPws, nil); err != nil {
return dstPws, fmt.Errorf("cannot merge %d inmemory parts: %w", len(dstPws), err) return dstPws, fmt.Errorf("cannot merge %d inmemory parts: %w", len(dstPws), err)
} }
return dstPws, nil return dstPws, nil
} }
func (pt *partition) mergePartsOptimal(pws []*partWrapper) error { func (pt *partition) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
defer func() { defer func() {
// Remove isInMerge flag from pws. // Remove isInMerge flag from pws.
pt.partsLock.Lock() pt.partsLock.Lock()
@ -812,7 +812,7 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
pt.partsLock.Unlock() pt.partsLock.Unlock()
}() }()
for len(pws) > defaultPartsToMerge { for len(pws) > defaultPartsToMerge {
if err := pt.mergeParts(pws[:defaultPartsToMerge], nil); err != nil { if err := pt.mergeParts(pws[:defaultPartsToMerge], stopCh); err != nil {
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err) return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
} }
pws = pws[defaultPartsToMerge:] pws = pws[defaultPartsToMerge:]
@ -820,12 +820,53 @@ func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
if len(pws) == 0 { if len(pws) == 0 {
return nil return nil
} }
if err := pt.mergeParts(pws, nil); err != nil { if err := pt.mergeParts(pws, stopCh); err != nil {
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err) return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
} }
return nil return nil
} }
// ForceMergeAllParts runs merge for all the parts in pt - small and big.
func (pt *partition) ForceMergeAllParts() error {
var pws []*partWrapper
pt.partsLock.Lock()
if !hasActiveMerges(pt.smallParts) && !hasActiveMerges(pt.bigParts) {
pws = appendAllPartsToMerge(pws, pt.smallParts)
pws = appendAllPartsToMerge(pws, pt.bigParts)
}
pt.partsLock.Unlock()
if len(pws) == 0 {
// Nothing to merge.
return nil
}
// If len(pws) == 1, then the merge must run anyway, so deleted time series could be removed from the part.
if err := pt.mergePartsOptimal(pws, pt.stopCh); err != nil {
return fmt.Errorf("cannot force merge %d parts from partition %q: %w", len(pws), pt.name, err)
}
return nil
}
func appendAllPartsToMerge(dst, src []*partWrapper) []*partWrapper {
for _, pw := range src {
if pw.isInMerge {
logger.Panicf("BUG: part %q is already in merge", pw.p.path)
}
pw.isInMerge = true
dst = append(dst, pw)
}
return dst
}
func hasActiveMerges(pws []*partWrapper) bool {
for _, pw := range pws {
if pw.isInMerge {
return true
}
}
return false
}
var ( var (
bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 bigMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2
smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2 smallMergeWorkersCount = (runtime.GOMAXPROCS(-1) + 1) / 2

View file

@ -1120,6 +1120,13 @@ func (mr *MetricRow) Unmarshal(src []byte) ([]byte, error) {
return tail, nil return tail, nil
} }
// ForceMergePartitions force-merges partitions in s with names starting from the given partitionNamePrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func (s *Storage) ForceMergePartitions(partitionNamePrefix string) error {
return s.tb.ForceMergePartitions(partitionNamePrefix)
}
// AddRows adds the given mrs to s. // AddRows adds the given mrs to s.
func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error { func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {
if len(mrs) == 0 { if len(mrs) == 0 {

View file

@ -815,6 +815,21 @@ func testStorageAddRows(s *Storage) error {
return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount) return fmt.Errorf("snapshot %q must contain at least %d rows; got %d", snapshotPath, minRowsExpected, m1.TableMetrics.SmallRowsCount)
} }
// Verify that force merge for the snapshot leaves only a single part per partition.
if err := s1.ForceMergePartitions(""); err != nil {
return fmt.Errorf("error when force merging partitions: %w", err)
}
ptws := s1.tb.GetPartitions(nil)
defer s1.tb.PutPartitions(ptws)
for _, ptw := range ptws {
pws := ptw.pt.GetParts(nil)
numParts := len(pws)
ptw.pt.PutParts(pws)
if numParts != 1 {
return fmt.Errorf("unexpected number of parts for partition %q after force merge; got %d; want 1", ptw.pt.name, numParts)
}
}
s1.MustClose() s1.MustClose()
// Delete the snapshot and make sure it is no longer visible. // Delete the snapshot and make sure it is no longer visible.

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -242,6 +243,26 @@ func (tb *table) UpdateMetrics(m *TableMetrics) {
tb.ptwsLock.Unlock() tb.ptwsLock.Unlock()
} }
// ForceMergePartitions force-merges partitions in tb with names starting from the given partitionNamePrefix.
//
// Partitions are merged sequentially in order to reduce load on the system.
func (tb *table) ForceMergePartitions(partitionNamePrefix string) error {
ptws := tb.GetPartitions(nil)
defer tb.PutPartitions(ptws)
for _, ptw := range ptws {
if !strings.HasPrefix(ptw.pt.name, partitionNamePrefix) {
continue
}
logger.Infof("starting forced merge for partition %q", ptw.pt.name)
startTime := time.Now()
if err := ptw.pt.ForceMergeAllParts(); err != nil {
return fmt.Errorf("cannot complete forced merge for partition %q: %w", ptw.pt.name, err)
}
logger.Infof("forced merge for partition %q has been finished in %.3f seconds", ptw.pt.name, time.Since(startTime).Seconds())
}
return nil
}
// AddRows adds the given rows to the table tb. // AddRows adds the given rows to the table tb.
func (tb *table) AddRows(rows []rawRow) error { func (tb *table) AddRows(rows []rawRow) error {
if len(rows) == 0 { if len(rows) == 0 {