mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-30 15:22:07 +00:00
app/vmstorage/promdb: code prettifying after a5583ddaff
This commit is contained in:
parent
a5583ddaff
commit
6e3cbae0b3
1 changed files with 38 additions and 47 deletions
|
@ -6,32 +6,28 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
"github.com/go-kit/kit/log"
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
promstorage "github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb"
|
||||
)
|
||||
|
||||
var (
|
||||
prometheusDataPath = flag.String("prometheusDataPath", "", "Optinal path to readonly historical Prometheus data")
|
||||
)
|
||||
var prometheusDataPath = flag.String("prometheusDataPath", "", "Optional path to readonly historical Prometheus data")
|
||||
|
||||
// filters prometheus db response.
|
||||
var maxRetentionTimeStampMsecs int64
|
||||
var prometheusRetentionMsecs int64
|
||||
|
||||
// Init must be called after flag.Parse and before using the package.
|
||||
//
|
||||
// See also MustClose.
|
||||
func Init(retentionMsecs int64) {
|
||||
maxRetentionTimeStampMsecs = time.Now().Unix()*1000 - retentionMsecs
|
||||
if promDB != nil {
|
||||
logger.Fatalf("BUG: it looks like MustOpenPromDB is called multiple times without MustClosePromDB call")
|
||||
}
|
||||
prometheusRetentionMsecs = retentionMsecs
|
||||
if *prometheusDataPath == "" {
|
||||
return
|
||||
}
|
||||
|
@ -40,40 +36,37 @@ func Init(retentionMsecs int64) {
|
|||
return nil
|
||||
})
|
||||
opts := tsdb.DefaultOptions()
|
||||
// set max block duration to the 10% of retention period or 31 day.
|
||||
// its common setting for promethues.
|
||||
// https://prometheus.io/docs/prometheus/latest/storage/#compaction
|
||||
maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond)
|
||||
|
||||
opts.MaxBlockDuration = maxBlockDuration
|
||||
opts.RetentionDuration = retentionMsecs
|
||||
if retentionMsecs/10 < maxBlockDuration {
|
||||
opts.MaxBlockDuration = retentionMsecs / 10
|
||||
}
|
||||
// its needed to make correct compaction ranges.
|
||||
// if minBlockDuration*3 > maxBlockDuration, no compaction will be made.
|
||||
// its case for retention less then 60 hours.
|
||||
if opts.MaxBlockDuration < opts.MinBlockDuration {
|
||||
opts.MinBlockDuration = opts.MaxBlockDuration / 3
|
||||
}
|
||||
|
||||
// custom delete function is needed, because prometheus uses BeyondTimeRetention func,
|
||||
// that calculates the difference between the first block and this block is larger than
|
||||
// the retention period so any blocks after that are added as deletable.
|
||||
// https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116
|
||||
// Set max block duration to 10% of retention period or 31 days
|
||||
// according to https://prometheus.io/docs/prometheus/latest/storage/#compaction
|
||||
maxBlockDuration := int64((31 * 24 * time.Hour) / time.Millisecond)
|
||||
if maxBlockDuration > retentionMsecs/10 {
|
||||
maxBlockDuration = retentionMsecs / 10
|
||||
}
|
||||
if maxBlockDuration < opts.MinBlockDuration {
|
||||
maxBlockDuration = opts.MinBlockDuration
|
||||
}
|
||||
opts.MaxBlockDuration = maxBlockDuration
|
||||
|
||||
// Custom delete function is needed, because Prometheus by default doesn't delete
|
||||
// blocks outside the retention if no new blocks are created with samples with the current timestamps.
|
||||
// See https://github.com/prometheus/prometheus/blob/997bb7134fcfd7279f250e183e78681e48a56aff/tsdb/db.go#L1116
|
||||
opts.BlocksToDelete = func(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
|
||||
deletable := make(map[ulid.ULID]struct{})
|
||||
m := make(map[ulid.ULID]struct{})
|
||||
minRetentionTime := time.Now().Unix()*1000 - retentionMsecs
|
||||
for _, block := range blocks {
|
||||
// add block marked for deletion by compaction.
|
||||
if block.Meta().Compaction.Deletable {
|
||||
deletable[block.Meta().ULID] = struct{}{}
|
||||
meta := block.Meta()
|
||||
// delete block marked for deletion by compaction code.
|
||||
if meta.Compaction.Deletable {
|
||||
m[meta.ULID] = struct{}{}
|
||||
continue
|
||||
}
|
||||
if block.MaxTime() < maxRetentionTimeStampMsecs {
|
||||
deletable[block.Meta().ULID] = struct{}{}
|
||||
if block.MaxTime() < minRetentionTime {
|
||||
m[meta.ULID] = struct{}{}
|
||||
}
|
||||
}
|
||||
return deletable
|
||||
return m
|
||||
}
|
||||
pdb, err := tsdb.Open(*prometheusDataPath, l, nil, opts)
|
||||
if err != nil {
|
||||
|
@ -177,10 +170,8 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D
|
|||
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||
ctx, cancel := context.WithDeadline(context.Background(), d)
|
||||
defer cancel()
|
||||
// adjust search query time range.
|
||||
// Prometheus keeps recent data at wal and it will not be deleted.
|
||||
min, max := adjustSearchQueryMinMaxTimestamps(sq, maxRetentionTimeStampMsecs)
|
||||
q, err := promDB.Querier(ctx, min, max)
|
||||
minTime, maxTime := getSearchTimeRange(sq)
|
||||
q, err := promDB.Querier(ctx, minTime, maxTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -221,17 +212,17 @@ func VisitSeries(sq *storage.SearchQuery, fetchData bool, deadline searchutils.D
|
|||
return ss.Err()
|
||||
}
|
||||
|
||||
// ensures, that search query is inside retention period.
|
||||
func adjustSearchQueryMinMaxTimestamps(sq *storage.SearchQuery, maxTimestamp int64) (int64, int64) {
|
||||
max := sq.MaxTimestamp
|
||||
min := sq.MinTimestamp
|
||||
if sq.MaxTimestamp < maxTimestamp {
|
||||
max = maxRetentionTimeStampMsecs
|
||||
func getSearchTimeRange(sq *storage.SearchQuery) (int64, int64) {
|
||||
maxTime := sq.MaxTimestamp
|
||||
minTime := sq.MinTimestamp
|
||||
minRetentionTime := time.Now().Unix()*1000 - prometheusRetentionMsecs
|
||||
if maxTime < minRetentionTime {
|
||||
maxTime = minRetentionTime
|
||||
}
|
||||
if sq.MinTimestamp < maxTimestamp {
|
||||
min = maxRetentionTimeStampMsecs
|
||||
if minTime < minRetentionTime {
|
||||
minTime = minRetentionTime
|
||||
}
|
||||
return min, max
|
||||
return minTime, maxTime
|
||||
}
|
||||
|
||||
func convertPromLabelsToMetricName(dst *storage.MetricName, labels []labels.Label) {
|
||||
|
|
Loading…
Reference in a new issue