From 288620ca408cffcc1e3cd161a7d91d4497b71582 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 15 Dec 2021 16:23:27 +0200 Subject: [PATCH] lib/storage: initial support for multi-level downsampling See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36 Based on https://github.com/valyala/VictoriaMetrics/pull/203 --- app/victoria-metrics/main.go | 7 +- app/vmselect/netstorage/netstorage.go | 2 +- lib/storage/block.go | 3 +- lib/storage/dedup.go | 22 +---- lib/storage/downsampling.go | 123 ++++++++++++++++++++++++++ lib/storage/downsampling_test.go | 62 +++++++++++++ lib/storage/partition.go | 9 +- 7 files changed, 203 insertions(+), 25 deletions(-) create mode 100644 lib/storage/downsampling.go create mode 100644 lib/storage/downsampling_test.go diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index d4ff5e247..f90c00e44 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -28,6 +28,8 @@ var ( "equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling") dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+ "Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse") + downsamplingPeriods = flagutil.NewArray("downsampling.period", "Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs "+ + "to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details") ) // custom api help links [["/api","doc"]] without http.pathPrefix. @@ -57,7 +59,10 @@ func main() { logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr) startTime := time.Now() - storage.SetDedupInterval(*minScrapeInterval) + err := storage.SetDownsamplingPeriods(*downsamplingPeriods, *minScrapeInterval) + if err != nil { + logger.Fatalf("cannot parse -downsampling.period: %s", err) + } vmstorage.Init(promql.ResetRollupResultCacheIfNeeded) vmselect.Init() vminsert.Init() diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 6a4e6c1e2..d3bc4765a 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -483,7 +483,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage. dst.Values = append(dst.Values, pts.pd.values...) dst.Timestamps = append(dst.Timestamps, pts.pd.timestamps...) } - dedupInterval := storage.GetDedupInterval() + dedupInterval := storage.GetDedupInterval(tr.MinTimestamp) mergeSortBlocks(dst, sbs, dedupInterval) if pts.pd != nil { if !sort.IsSorted(dst) { diff --git a/lib/storage/block.go b/lib/storage/block.go index edc7507e6..8335705eb 100644 --- a/lib/storage/block.go +++ b/lib/storage/block.go @@ -161,7 +161,8 @@ func (b *Block) deduplicateSamplesDuringMerge() { // Nothing to dedup. return } - dedupInterval := GetDedupInterval() + maxTimestamp := srcTimestamps[len(srcTimestamps)-1] + dedupInterval := GetDedupInterval(maxTimestamp) if dedupInterval <= 0 { // Deduplication is disabled. return diff --git a/lib/storage/dedup.go b/lib/storage/dedup.go index c92cc6221..b566b37dc 100644 --- a/lib/storage/dedup.go +++ b/lib/storage/dedup.go @@ -1,27 +1,7 @@ package storage -import ( - "time" -) - -// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying. -// -// De-duplication is disabled if dedupInterval is 0. -// -// This function must be called before initializing the storage. -func SetDedupInterval(dedupInterval time.Duration) { - globalDedupInterval = dedupInterval.Milliseconds() -} - -// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval. -func GetDedupInterval() int64 { - return globalDedupInterval -} - -var globalDedupInterval int64 - func isDedupEnabled() bool { - return globalDedupInterval > 0 + return len(downsamplingPeriods) > 0 } // DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds. diff --git a/lib/storage/downsampling.go b/lib/storage/downsampling.go new file mode 100644 index 000000000..2e96e4815 --- /dev/null +++ b/lib/storage/downsampling.go @@ -0,0 +1,123 @@ +package storage + +import ( + "fmt" + "sort" + "strings" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" + "github.com/VictoriaMetrics/metricsql" +) + +// SetDownsamplingPeriods configures downsampling. +// +// The function must be called before opening or creating any storage. +func SetDownsamplingPeriods(periods []string, dedupInterval time.Duration) error { + dsps, err := parseDownsamplingPeriods(periods) + if err != nil { + return err + } + dedupIntervalMs := dedupInterval.Milliseconds() + if dedupIntervalMs > 0 { + if len(dsps) > 0 && dsps[len(dsps)-1].Offset == 0 { + return fmt.Errorf("-dedup.minScrapeInterval=%s cannot be used if -downsampling.period=%s contains zero offset", dedupInterval, periods) + } + // Deduplication is a special case of downsampling with zero offset. + dsps = append(dsps, DownsamplingPeriod{ + Offset: 0, + Interval: dedupIntervalMs, + }) + } + downsamplingPeriods = dsps + return nil +} + +// DownsamplingPeriod describes downsampling period +type DownsamplingPeriod struct { + // Offset in milliseconds from the current time when the downsampling with the given interval must be applied + Offset int64 + // Interval for downsampling - only a single sample is left per each interval + Interval int64 +} + +// String implements interface +func (dsp DownsamplingPeriod) String() string { + offset := time.Duration(dsp.Offset) * time.Millisecond + interval := time.Duration(dsp.Interval) * time.Millisecond + return fmt.Sprintf("%s:%s", offset, interval) +} + +func (dsp *DownsamplingPeriod) parse(s string) error { + idx := strings.Index(s, ":") + if idx <= 0 { + return fmt.Errorf("incorrect format for downsampling period: %s, want `offset:interval` format", s) + } + offsetStr, intervalStr := s[:idx], s[idx+1:] + interval, err := metricsql.DurationValue(intervalStr, 0) + if err != nil { + return fmt.Errorf("incorrect interval: %s format for downsampling interval: %s err: %w", intervalStr, s, err) + } + offset, err := metricsql.DurationValue(offsetStr, 0) + if err != nil { + return fmt.Errorf("incorrect duration: %s format for downsampling offset: %s err: %w", offsetStr, s, err) + } + dsp.Interval = interval + dsp.Offset = offset + // sanity check + if offset > 0 && interval > offset { + return fmt.Errorf("downsampling interval=%d cannot exceed offset=%d", dsp.Interval, dsp.Offset) + } + return nil +} + +var downsamplingPeriods []DownsamplingPeriod + +// GetDedupInterval returns dedup interval, which must be applied to samples with the given timestamp. +func GetDedupInterval(timestamp int64) int64 { + dsp := getDownsamplingPeriod(timestamp) + return dsp.Interval +} + +// getDownsamplingPeriod returns downsampling period, which must be used for the given timestamp +func getDownsamplingPeriod(timestamp int64) DownsamplingPeriod { + offset := int64(fasttime.UnixTimestamp())*1000 - timestamp + for _, dsp := range downsamplingPeriods { + if offset >= dsp.Offset { + return dsp + } + } + return DownsamplingPeriod{} +} + +func parseDownsamplingPeriods(periods []string) ([]DownsamplingPeriod, error) { + if len(periods) == 0 { + return nil, nil + } + var dsps []DownsamplingPeriod + for _, period := range periods { + var dsp DownsamplingPeriod + if err := dsp.parse(period); err != nil { + return nil, fmt.Errorf("cannot parse downsampling period %q: %w", period, err) + } + dsps = append(dsps, dsp) + } + sort.Slice(dsps, func(i, j int) bool { + return dsps[i].Offset > dsps[j].Offset + }) + dspPrev := dsps[0] + // sanity checks. + for _, dsp := range dsps[1:] { + if dspPrev.Interval <= dsp.Interval { + return nil, fmt.Errorf("prev downsampling interval %d must be bigger than the next interval %d", dspPrev.Interval, dsp.Interval) + } + if dspPrev.Offset == dsp.Offset { + return nil, fmt.Errorf("duplicate downsampling offset: %d", dsp.Offset) + } + if dspPrev.Interval%dsp.Interval != 0 { + return nil, fmt.Errorf("downsamping intervals must be multiples; prev: %d, current: %d", dspPrev.Interval, dsp.Interval) + } + dspPrev = dsp + } + return dsps, nil +} diff --git a/lib/storage/downsampling_test.go b/lib/storage/downsampling_test.go new file mode 100644 index 000000000..ab6c07412 --- /dev/null +++ b/lib/storage/downsampling_test.go @@ -0,0 +1,62 @@ +package storage + +import ( + "strings" + "testing" +) + +func TestParseDownsamplingPeriodsFailure(t *testing.T) { + f := func(name string, src []string) { + t.Helper() + t.Run(name, func(t *testing.T) { + if _, err := parseDownsamplingPeriods(src); err == nil { + t.Fatalf("want fail for input: %s", strings.Join(src, ",")) + } + }) + } + f("empty duration", []string{"15d"}) + f("empty interval", []string{":1m"}) + f("incorrect duration decrease", []string{"30d:15h", "60d:1h"}) + f("duplicate offset", []string{"30d:15h", "30d:1h"}) + f("duplicate interval", []string{"60d:1h", "30d:1h"}) + f("not multiple intervals", []string{"90d:12h", "60:9h", "30d:7h"}) +} + +func TestParseDownsamplingPeriodsSuccess(t *testing.T) { + f := func(name string, src []string, expected []DownsamplingPeriod) { + t.Helper() + t.Run(name, func(t *testing.T) { + dsps, err := parseDownsamplingPeriods(src) + if err != nil { + t.Fatalf("cannot parse downsampling configuration for: %s, err: %s", strings.Join(src, ","), err) + } + assertDownsamplingPeriods(t, expected, dsps) + }) + } + f("one period", []string{"30d:1m"}, []DownsamplingPeriod{ + {Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000}, + }) + f("three periods", []string{"15d:30s", "30d:1m", "60d:15m"}, []DownsamplingPeriod{ + {Offset: 60 * 24 * 3600 * 1000, Interval: 15 * 60 * 1000}, + {Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000}, + {Offset: 15 * 24 * 3600 * 1000, Interval: 30 * 1000}, + }) + f("with the same divider periods", []string{"15d:1m", "30d:7m", "60d:14m", "90d:28m"}, []DownsamplingPeriod{ + {Offset: 90 * 24 * 3600 * 1000, Interval: 28 * 60 * 1000}, + {Offset: 60 * 24 * 3600 * 1000, Interval: 14 * 60 * 1000}, + {Offset: 30 * 24 * 3600 * 1000, Interval: 7 * 60 * 1000}, + {Offset: 15 * 24 * 3600 * 1000, Interval: 60 * 1000}, + }) +} + +func assertDownsamplingPeriods(t *testing.T, want, got []DownsamplingPeriod) { + t.Helper() + if len(want) != len(got) { + t.Fatalf("len mismatch, want: %d, got: %d", len(want), len(got)) + } + for i := 0; i < len(want); i++ { + if want[i] != got[i] { + t.Fatalf("want period: %s, got period: %s, idx: %d", want[i], got[i], i) + } + } +} diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 722504c24..7e0439ffe 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1082,7 +1082,14 @@ func (pt *partition) runFinalDedup() error { func isDedupNeeded(pt *partition) bool { pws := pt.GetParts(nil) defer pt.PutParts(pws) - dedupInterval := GetDedupInterval() + // Get dedup interval, which covers all the parts in the partition. + var maxTimestamp int64 + for _, pw := range pws { + if maxTimestamp < pw.p.ph.MaxTimestamp { + maxTimestamp = pw.p.ph.MaxTimestamp + } + } + dedupInterval := GetDedupInterval(maxTimestamp) if dedupInterval <= 0 { // The deduplication isn't needed. return false