diff --git a/app/vmstorage/promdb/promdb.go b/app/vmstorage/promdb/promdb.go index 8ec4fd0b6d..0bf37c650d 100644 --- a/app/vmstorage/promdb/promdb.go +++ b/app/vmstorage/promdb/promdb.go @@ -6,32 +6,28 @@ import ( "fmt" "time" - "github.com/oklog/ulid" - "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/pkg/labels" promstorage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" ) -var ( - prometheusDataPath = flag.String("prometheusDataPath", "", "Optinal path to readonly historical Prometheus data") -) +var prometheusDataPath = flag.String("prometheusDataPath", "", "Optional path to readonly historical Prometheus data") -// filters prometheus db response. -var maxRetentionTimeStampMsecs int64 +var prometheusRetentionMsecs int64 // Init must be called after flag.Parse and before using the package. // // See also MustClose. func Init(retentionMsecs int64) { - maxRetentionTimeStampMsecs = time.Now().Unix()*1000 - retentionMsecs if promDB != nil { logger.Fatalf("BUG: it looks like MustOpenPromDB is called multiple times without MustClosePromDB call") } + prometheusRetentionMsecs = retentionMsecs if *prometheusDataPath == "" { return } @@ -40,40 +36,37 @@ func Init(retentionMsecs int64) { return nil }) opts := tsdb.DefaultOptions() - // set max block duration to the 10% of retention period or 31 day. - // its common setting for promethues. - // https://prometheus.io/docs/prometheus/latest/storage/#compaction - maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond) - - opts.MaxBlockDuration = maxBlockDuration opts.RetentionDuration = retentionMsecs - if retentionMsecs/10 < maxBlockDuration { - opts.MaxBlockDuration = retentionMsecs / 10 - } - // its needed to make correct compaction ranges. - // if minBlockDuration*3 > maxBlockDuration, no compaction will be made. - // its case for retention less then 60 hours. - if opts.MaxBlockDuration < opts.MinBlockDuration { - opts.MinBlockDuration = opts.MaxBlockDuration / 3 - } - // custom delete function is needed, because prometheus uses BeyondTimeRetention func, - // that calculates the difference between the first block and this block is larger than - // the retention period so any blocks after that are added as deletable. - // https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116 + // Set max block duration to 10% of retention period or 31 days + // according to https://prometheus.io/docs/prometheus/latest/storage/#compaction + maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond) + if maxBlockDuration > retentionMsecs/10 { + maxBlockDuration = retentionMsecs / 10 + } + if maxBlockDuration < opts.MinBlockDuration { + maxBlockDuration = opts.MinBlockDuration + } + opts.MaxBlockDuration = maxBlockDuration + + // Custom delete function is needed, because Prometheus by default doesn't delete + // blocks outside the retention if no new blocks are created with samples with the current timestamps. + // See https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116 opts.BlocksToDelete = func(blocks []*tsdb.Block) map[ulid.ULID]struct{} { - deletable := make(map[ulid.ULID]struct{}) + m := make(map[ulid.ULID]struct{}) + minRetentionTime := time.Now().Unix()*1000 - retentionMsecs for _, block := range blocks { - // add block marked for deletion by compaction. - if block.Meta().Compaction.Deletable { - deletable[block.Meta().ULID] = struct{}{} + meta := block.Meta() + // delete block marked for deletion by compaction code. + if meta.Compaction.Deletable { + m[meta.ULID] = struct{}{} continue } - if block.MaxTime() < maxRetentionTimeStampMsecs { - deletable[block.Meta().ULID] = struct{}{} + if block.MaxTime() < minRetentionTime { + m[meta.ULID] = struct{}{} } } - return deletable + return m } pdb, err := tsdb.Open(*prometheusDataPath, l, nil, opts) if err != nil { @@ -177,10 +170,8 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D d := time.Unix(int64(deadline.Deadline()), 0) ctx, cancel := context.WithDeadline(context.Background(), d) defer cancel() - // adjust search query time range. - // Prometheus keeps recent data at wal and it will not be deleted. - min, max := adjustSearchQueryMinMaxTimestamps(sq, maxRetentionTimeStampMsecs) - q, err := promDB.Querier(ctx, min, max) + minTime, maxTime := getSearchTimeRange(sq) + q, err := promDB.Querier(ctx, minTime, maxTime) if err != nil { return err } @@ -221,17 +212,17 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D return ss.Err() } -// ensures, that search query is inside retention period. -func adjustSearchQueryMinMaxTimestamps(sq *storage.SearchQuery, maxTimestamp int64) (int64, int64) { - max := sq.MaxTimestamp - min := sq.MinTimestamp - if sq.MaxTimestamp < maxTimestamp { - max = maxRetentionTimeStampMsecs +func getSearchTimeRange(sq *storage.SearchQuery) (int64, int64) { + maxTime := sq.MaxTimestamp + minTime := sq.MinTimestamp + minRetentionTime := time.Now().Unix()*1000 - prometheusRetentionMsecs + if maxTime < minRetentionTime { + maxTime = minRetentionTime } - if sq.MinTimestamp < maxTimestamp { - min = maxRetentionTimeStampMsecs + if minTime < minRetentionTime { + minTime = minRetentionTime } - return min, max + return minTime, maxTime } func convertPromLabelsToMetricName(dst *storage.MetricName, labels []labels.Label) {