From a5583ddaff5af255a3056659c9ce8bb5078886da Mon Sep 17 00:00:00 2001 From: Nikolay Date: Thu, 7 Jan 2021 23:55:35 +0300 Subject: [PATCH] adds period compaction to prometheus data (#105) * adds period compaction to prometheus data and filtering for datapoints outside retention period * lint fix * adds custom retention func * fixes compaction, fixes search query adjustment --- app/vmstorage/promdb/promdb.go | 62 ++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/app/vmstorage/promdb/promdb.go b/app/vmstorage/promdb/promdb.go index 5ced884be..8ec4fd0b6 100644 --- a/app/vmstorage/promdb/promdb.go +++ b/app/vmstorage/promdb/promdb.go @@ -6,6 +6,8 @@ import ( "fmt" "time" + "github.com/oklog/ulid" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" @@ -15,12 +17,18 @@ import ( "github.com/prometheus/prometheus/tsdb" ) -var prometheusDataPath = flag.String("prometheusDataPath", "", "Optinal path to readonly historical Prometheus data") +var ( + prometheusDataPath = flag.String("prometheusDataPath", "", "Optinal path to readonly historical Prometheus data") +) + +// filters prometheus db response. +var maxRetentionTimeStampMsecs int64 // Init must be called after flag.Parse and before using the package. // // See also MustClose. func Init(retentionMsecs int64) { + maxRetentionTimeStampMsecs = time.Now().Unix()*1000 - retentionMsecs if promDB != nil { logger.Fatalf("BUG: it looks like MustOpenPromDB is called multiple times without MustClosePromDB call") } @@ -32,7 +40,41 @@ func Init(retentionMsecs int64) { return nil }) opts := tsdb.DefaultOptions() + // set max block duration to the 10% of retention period or 31 day. + // its common setting for promethues. + // https://prometheus.io/docs/prometheus/latest/storage/#compaction + maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond) + + opts.MaxBlockDuration = maxBlockDuration opts.RetentionDuration = retentionMsecs + if retentionMsecs/10 < maxBlockDuration { + opts.MaxBlockDuration = retentionMsecs / 10 + } + // its needed to make correct compaction ranges. + // if minBlockDuration*3 > maxBlockDuration, no compaction will be made. + // its case for retention less then 60 hours. + if opts.MaxBlockDuration < opts.MinBlockDuration { + opts.MinBlockDuration = opts.MaxBlockDuration / 3 + } + + // custom delete function is needed, because prometheus uses BeyondTimeRetention func, + // that calculates the difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deletable. + // https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116 + opts.BlocksToDelete = func(blocks []*tsdb.Block) map[ulid.ULID]struct{} { + deletable := make(map[ulid.ULID]struct{}) + for _, block := range blocks { + // add block marked for deletion by compaction. + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = struct{}{} + continue + } + if block.MaxTime() < maxRetentionTimeStampMsecs { + deletable[block.Meta().ULID] = struct{}{} + } + } + return deletable + } pdb, err := tsdb.Open(*prometheusDataPath, l, nil, opts) if err != nil { logger.Panicf("FATAL: cannot open Prometheus data at -prometheusDataPath=%q: %s", *prometheusDataPath, err) @@ -135,7 +177,10 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D d := time.Unix(int64(deadline.Deadline()), 0) ctx, cancel := context.WithDeadline(context.Background(), d) defer cancel() - q, err := promDB.Querier(ctx, sq.MinTimestamp, sq.MaxTimestamp) + // adjust search query time range. + // Prometheus keeps recent data at wal and it will not be deleted. + min, max := adjustSearchQueryMinMaxTimestamps(sq, maxRetentionTimeStampMsecs) + q, err := promDB.Querier(ctx, min, max) if err != nil { return err } @@ -176,6 +221,19 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D return ss.Err() } +// ensures, that search query is inside retention period. +func adjustSearchQueryMinMaxTimestamps(sq *storage.SearchQuery, maxTimestamp int64) (int64, int64) { + max := sq.MaxTimestamp + min := sq.MinTimestamp + if sq.MaxTimestamp < maxTimestamp { + max = maxRetentionTimeStampMsecs + } + if sq.MinTimestamp < maxTimestamp { + min = maxRetentionTimeStampMsecs + } + return min, max +} + func convertPromLabelsToMetricName(dst *storage.MetricName, labels []labels.Label) { dst.Reset() for _, label := range labels {