lib/storage: initial support for multi-level downsampling

See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/36

Based on https://github.com/valyala/VictoriaMetrics/pull/203
This commit is contained in:
Aliaksandr Valialkin 2021-12-15 16:23:27 +02:00
parent 2847c84a7b
commit 288620ca40
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 203 additions and 25 deletions

View file

@ -28,6 +28,8 @@ var (
"equal to -dedup.minScrapeInterval > 0. See https://docs.victoriametrics.com/#deduplication and https://docs.victoriametrics.com/#downsampling")
dryRun = flag.Bool("dryRun", false, "Whether to check only -promscrape.config and then exit. "+
"Unknown config entries are allowed in -promscrape.config by default. This can be changed with -promscrape.config.strictParse")
downsamplingPeriods = flagutil.NewArray("downsampling.period", "Comma-separated downsampling periods in the format 'offset:period'. For example, '30d:10m' instructs "+
"to leave a single sample per 10 minutes for samples older than 30 days. See https://docs.victoriametrics.com/#downsampling for details")
)
// custom api help links [["/api","doc"]] without http.pathPrefix.
@ -57,7 +59,10 @@ func main() {
logger.Infof("starting VictoriaMetrics at %q...", *httpListenAddr)
startTime := time.Now()
storage.SetDedupInterval(*minScrapeInterval)
err := storage.SetDownsamplingPeriods(*downsamplingPeriods, *minScrapeInterval)
if err != nil {
logger.Fatalf("cannot parse -downsampling.period: %s", err)
}
vmstorage.Init(promql.ResetRollupResultCacheIfNeeded)
vmselect.Init()
vminsert.Init()

View file

@ -483,7 +483,7 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.
dst.Values = append(dst.Values, pts.pd.values...)
dst.Timestamps = append(dst.Timestamps, pts.pd.timestamps...)
}
dedupInterval := storage.GetDedupInterval()
dedupInterval := storage.GetDedupInterval(tr.MinTimestamp)
mergeSortBlocks(dst, sbs, dedupInterval)
if pts.pd != nil {
if !sort.IsSorted(dst) {

View file

@ -161,7 +161,8 @@ func (b *Block) deduplicateSamplesDuringMerge() {
// Nothing to dedup.
return
}
dedupInterval := GetDedupInterval()
maxTimestamp := srcTimestamps[len(srcTimestamps)-1]
dedupInterval := GetDedupInterval(maxTimestamp)
if dedupInterval <= 0 {
// Deduplication is disabled.
return

View file

@ -1,27 +1,7 @@
package storage
import (
"time"
)
// SetDedupInterval sets the deduplication interval, which is applied to raw samples during data ingestion and querying.
//
// De-duplication is disabled if dedupInterval is 0.
//
// This function must be called before initializing the storage.
func SetDedupInterval(dedupInterval time.Duration) {
globalDedupInterval = dedupInterval.Milliseconds()
}
// GetDedupInterval returns the dedup interval in milliseconds, which has been set via SetDedupInterval.
func GetDedupInterval() int64 {
return globalDedupInterval
}
var globalDedupInterval int64
func isDedupEnabled() bool {
return globalDedupInterval > 0
return len(downsamplingPeriods) > 0
}
// DeduplicateSamples removes samples from src* if they are closer to each other than dedupInterval in millseconds.

123
lib/storage/downsampling.go Normal file
View file

@ -0,0 +1,123 @@
package storage
import (
"fmt"
"sort"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/metricsql"
)
// SetDownsamplingPeriods configures downsampling.
//
// The function must be called before opening or creating any storage.
func SetDownsamplingPeriods(periods []string, dedupInterval time.Duration) error {
dsps, err := parseDownsamplingPeriods(periods)
if err != nil {
return err
}
dedupIntervalMs := dedupInterval.Milliseconds()
if dedupIntervalMs > 0 {
if len(dsps) > 0 && dsps[len(dsps)-1].Offset == 0 {
return fmt.Errorf("-dedup.minScrapeInterval=%s cannot be used if -downsampling.period=%s contains zero offset", dedupInterval, periods)
}
// Deduplication is a special case of downsampling with zero offset.
dsps = append(dsps, DownsamplingPeriod{
Offset: 0,
Interval: dedupIntervalMs,
})
}
downsamplingPeriods = dsps
return nil
}
// DownsamplingPeriod describes downsampling period
type DownsamplingPeriod struct {
// Offset in milliseconds from the current time when the downsampling with the given interval must be applied
Offset int64
// Interval for downsampling - only a single sample is left per each interval
Interval int64
}
// String implements interface
func (dsp DownsamplingPeriod) String() string {
offset := time.Duration(dsp.Offset) * time.Millisecond
interval := time.Duration(dsp.Interval) * time.Millisecond
return fmt.Sprintf("%s:%s", offset, interval)
}
func (dsp *DownsamplingPeriod) parse(s string) error {
idx := strings.Index(s, ":")
if idx <= 0 {
return fmt.Errorf("incorrect format for downsampling period: %s, want `offset:interval` format", s)
}
offsetStr, intervalStr := s[:idx], s[idx+1:]
interval, err := metricsql.DurationValue(intervalStr, 0)
if err != nil {
return fmt.Errorf("incorrect interval: %s format for downsampling interval: %s err: %w", intervalStr, s, err)
}
offset, err := metricsql.DurationValue(offsetStr, 0)
if err != nil {
return fmt.Errorf("incorrect duration: %s format for downsampling offset: %s err: %w", offsetStr, s, err)
}
dsp.Interval = interval
dsp.Offset = offset
// sanity check
if offset > 0 && interval > offset {
return fmt.Errorf("downsampling interval=%d cannot exceed offset=%d", dsp.Interval, dsp.Offset)
}
return nil
}
var downsamplingPeriods []DownsamplingPeriod
// GetDedupInterval returns dedup interval, which must be applied to samples with the given timestamp.
func GetDedupInterval(timestamp int64) int64 {
dsp := getDownsamplingPeriod(timestamp)
return dsp.Interval
}
// getDownsamplingPeriod returns downsampling period, which must be used for the given timestamp
func getDownsamplingPeriod(timestamp int64) DownsamplingPeriod {
offset := int64(fasttime.UnixTimestamp())*1000 - timestamp
for _, dsp := range downsamplingPeriods {
if offset >= dsp.Offset {
return dsp
}
}
return DownsamplingPeriod{}
}
func parseDownsamplingPeriods(periods []string) ([]DownsamplingPeriod, error) {
if len(periods) == 0 {
return nil, nil
}
var dsps []DownsamplingPeriod
for _, period := range periods {
var dsp DownsamplingPeriod
if err := dsp.parse(period); err != nil {
return nil, fmt.Errorf("cannot parse downsampling period %q: %w", period, err)
}
dsps = append(dsps, dsp)
}
sort.Slice(dsps, func(i, j int) bool {
return dsps[i].Offset > dsps[j].Offset
})
dspPrev := dsps[0]
// sanity checks.
for _, dsp := range dsps[1:] {
if dspPrev.Interval <= dsp.Interval {
return nil, fmt.Errorf("prev downsampling interval %d must be bigger than the next interval %d", dspPrev.Interval, dsp.Interval)
}
if dspPrev.Offset == dsp.Offset {
return nil, fmt.Errorf("duplicate downsampling offset: %d", dsp.Offset)
}
if dspPrev.Interval%dsp.Interval != 0 {
return nil, fmt.Errorf("downsamping intervals must be multiples; prev: %d, current: %d", dspPrev.Interval, dsp.Interval)
}
dspPrev = dsp
}
return dsps, nil
}

View file

@ -0,0 +1,62 @@
package storage
import (
"strings"
"testing"
)
func TestParseDownsamplingPeriodsFailure(t *testing.T) {
f := func(name string, src []string) {
t.Helper()
t.Run(name, func(t *testing.T) {
if _, err := parseDownsamplingPeriods(src); err == nil {
t.Fatalf("want fail for input: %s", strings.Join(src, ","))
}
})
}
f("empty duration", []string{"15d"})
f("empty interval", []string{":1m"})
f("incorrect duration decrease", []string{"30d:15h", "60d:1h"})
f("duplicate offset", []string{"30d:15h", "30d:1h"})
f("duplicate interval", []string{"60d:1h", "30d:1h"})
f("not multiple intervals", []string{"90d:12h", "60:9h", "30d:7h"})
}
func TestParseDownsamplingPeriodsSuccess(t *testing.T) {
f := func(name string, src []string, expected []DownsamplingPeriod) {
t.Helper()
t.Run(name, func(t *testing.T) {
dsps, err := parseDownsamplingPeriods(src)
if err != nil {
t.Fatalf("cannot parse downsampling configuration for: %s, err: %s", strings.Join(src, ","), err)
}
assertDownsamplingPeriods(t, expected, dsps)
})
}
f("one period", []string{"30d:1m"}, []DownsamplingPeriod{
{Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000},
})
f("three periods", []string{"15d:30s", "30d:1m", "60d:15m"}, []DownsamplingPeriod{
{Offset: 60 * 24 * 3600 * 1000, Interval: 15 * 60 * 1000},
{Offset: 30 * 24 * 3600 * 1000, Interval: 60 * 1000},
{Offset: 15 * 24 * 3600 * 1000, Interval: 30 * 1000},
})
f("with the same divider periods", []string{"15d:1m", "30d:7m", "60d:14m", "90d:28m"}, []DownsamplingPeriod{
{Offset: 90 * 24 * 3600 * 1000, Interval: 28 * 60 * 1000},
{Offset: 60 * 24 * 3600 * 1000, Interval: 14 * 60 * 1000},
{Offset: 30 * 24 * 3600 * 1000, Interval: 7 * 60 * 1000},
{Offset: 15 * 24 * 3600 * 1000, Interval: 60 * 1000},
})
}
func assertDownsamplingPeriods(t *testing.T, want, got []DownsamplingPeriod) {
t.Helper()
if len(want) != len(got) {
t.Fatalf("len mismatch, want: %d, got: %d", len(want), len(got))
}
for i := 0; i < len(want); i++ {
if want[i] != got[i] {
t.Fatalf("want period: %s, got period: %s, idx: %d", want[i], got[i], i)
}
}
}

View file

@ -1082,7 +1082,14 @@ func (pt *partition) runFinalDedup() error {
func isDedupNeeded(pt *partition) bool {
pws := pt.GetParts(nil)
defer pt.PutParts(pws)
dedupInterval := GetDedupInterval()
// Get dedup interval, which covers all the parts in the partition.
var maxTimestamp int64
for _, pw := range pws {
if maxTimestamp < pw.p.ph.MaxTimestamp {
maxTimestamp = pw.p.ph.MaxTimestamp
}
}
dedupInterval := GetDedupInterval(maxTimestamp)
if dedupInterval <= 0 {
// The deduplication isn't needed.
return false