diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index efd8dfc21..0a6c9b55a 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -39,13 +39,13 @@ func Init() { logger.Panicf("BUG: Init() has been already called") } - if retentionPeriod.Msecs < 24*3600*1000 { + if retentionPeriod.Duration() < 24*time.Hour { logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod) } cfg := &logstorage.StorageConfig{ - Retention: time.Millisecond * time.Duration(retentionPeriod.Msecs), + Retention: retentionPeriod.Duration(), FlushInterval: *inmemoryDataFlushInterval, - FutureRetention: time.Millisecond * time.Duration(futureRetention.Msecs), + FutureRetention: futureRetention.Duration(), LogNewStreams: *logNewStreams, LogIngestedRows: *logIngestedRows, } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index d7381a9f9..eef298616 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -102,13 +102,13 @@ func Init(resetCacheIfNeeded func(mrs []storage.MetricRow)) { mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.IntN()) mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.IntN()) - if retentionPeriod.Msecs < 24*3600*1000 { + if retentionPeriod.Duration() < 24*time.Hour { logger.Fatalf("-retentionPeriod cannot be smaller than a day; got %s", retentionPeriod) } logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod) startTime := time.Now() WG = syncwg.WaitGroup{} - strg := storage.MustOpenStorage(*DataPath, retentionPeriod.Msecs, *maxHourlySeries, *maxDailySeries) + strg := storage.MustOpenStorage(*DataPath, retentionPeriod.Duration(), *maxHourlySeries, *maxDailySeries) Storage = strg initStaleSnapshotsRemover(strg) @@ -380,10 +380,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { func initStaleSnapshotsRemover(strg *storage.Storage) { staleSnapshotsRemoverCh = make(chan struct{}) - if snapshotsMaxAge.Msecs <= 0 { + if snapshotsMaxAge.Duration() <= 0 { return } - snapshotsMaxAgeDur := time.Duration(snapshotsMaxAge.Msecs) * time.Millisecond + snapshotsMaxAgeDur := snapshotsMaxAge.Duration() staleSnapshotsRemoverWG.Add(1) go func() { defer staleSnapshotsRemoverWG.Done() diff --git a/lib/flagutil/duration.go b/lib/flagutil/duration.go index 87d602c6e..d42e20831 100644 --- a/lib/flagutil/duration.go +++ b/lib/flagutil/duration.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" "strings" + "time" "github.com/VictoriaMetrics/metricsql" ) @@ -30,6 +31,11 @@ type Duration struct { valueString string } +// Duration convert to time.Duration. +func (d *Duration) Duration() time.Duration { + return time.Millisecond * time.Duration(d.Msecs) +} + // String implements flag.Value interface func (d *Duration) String() string { return d.valueString diff --git a/lib/flagutil/duration_test.go b/lib/flagutil/duration_test.go index bd26d47b8..b899c45a8 100644 --- a/lib/flagutil/duration_test.go +++ b/lib/flagutil/duration_test.go @@ -3,6 +3,7 @@ package flagutil import ( "strings" "testing" + "time" ) func TestDurationSetFailure(t *testing.T) { @@ -59,3 +60,22 @@ func TestDurationSetSuccess(t *testing.T) { f("1w", 7*24*3600*1000) f("0.25y", 0.25*365*24*3600*1000) } + +func TestDurationDuration(t *testing.T) { + f := func(value string, expected time.Duration) { + t.Helper() + var d Duration + if err := d.Set(value); err != nil { + t.Fatalf("unexpected error in d.Set(%q): %s", value, err) + } + if d.Duration() != expected { + t.Fatalf("unexpected result; got %v; want %v", d.Duration().String(), expected.String()) + } + } + f("0", 0) + f("1", 31*24*time.Hour) + f("1h", time.Hour) + f("1.5d", 1.5*24*time.Hour) + f("1w", 7*24*time.Hour) + f("0.25y", 0.25*365*24*time.Hour) +} diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 074613f33..742592b95 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -508,7 +508,7 @@ func TestIndexDB(t *testing.T) { t.Run("serial", func(t *testing.T) { const path = "TestIndexDB-serial" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() mns, tsids, err := testIndexDBGetOrCreateTSIDByName(db, metricGroups) @@ -521,7 +521,7 @@ func TestIndexDB(t *testing.T) { // Re-open the storage and verify it works as expected. s.MustClose() - s = MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s = MustOpenStorage(path, retentionMax, 0, 0) db = s.idb() if err := testIndexDBCheckTSIDByName(db, mns, tsids, false); err != nil { @@ -534,7 +534,7 @@ func TestIndexDB(t *testing.T) { t.Run("concurrent", func(t *testing.T) { const path = "TestIndexDB-concurrent" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() ch := make(chan error, 3) @@ -1429,7 +1429,7 @@ func TestMatchTagFilters(t *testing.T) { func TestIndexDBRepopulateAfterRotation(t *testing.T) { r := rand.New(rand.NewSource(1)) path := "TestIndexRepopulateAfterRotation" - s := MustOpenStorage(path, msecsPerMonth, 1e5, 1e5) + s := MustOpenStorage(path, retentionMonth, 1e5, 1e5) db := s.idb() if db.generation == 0 { @@ -1516,7 +1516,7 @@ func TestIndexDBRepopulateAfterRotation(t *testing.T) { func TestSearchTSIDWithTimeRange(t *testing.T) { const path = "TestSearchTSIDWithTimeRange" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() is := db.getIndexSearch(noDeadline) @@ -1966,7 +1966,7 @@ func newTestStorage() *Storage { metricNameCache: workingsetcache.New(1234), tsidCache: workingsetcache.New(1234), dateMetricIDCache: newDateMetricIDCache(), - retentionMsecs: maxRetentionMsecs, + retentionMsecs: retentionMax.Milliseconds(), } s.setDeletedMetricIDs(&uint64set.Set{}) return s diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 0a2c48c19..d67bebaa7 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -40,7 +40,7 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) { func BenchmarkIndexDBAddTSIDs(b *testing.B) { const path = "BenchmarkIndexDBAddTSIDs" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() const recordsPerLoop = 1e3 @@ -94,7 +94,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { // This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52 // See https://www.robustperception.io/evaluating-performance-and-correctness for more details. const path = "BenchmarkHeadPostingForMatchers" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() // Fill the db with data as in https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L66 @@ -261,7 +261,7 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { func BenchmarkIndexDBGetTSIDs(b *testing.B) { const path = "BenchmarkIndexDBGetTSIDs" - s := MustOpenStorage(path, maxRetentionMsecs, 0, 0) + s := MustOpenStorage(path, retentionMax, 0, 0) db := s.idb() const recordsPerLoop = 1000 diff --git a/lib/storage/storage.go b/lib/storage/storage.go index e61c0f0c0..9514ea8c9 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -33,8 +33,8 @@ import ( ) const ( - msecsPerMonth = 31 * 24 * 3600 * 1000 - maxRetentionMsecs = 100 * 12 * msecsPerMonth + retentionMonth = 31 * 24 * time.Hour + retentionMax = 100 * 12 * retentionMonth ) // Storage represents TSDB storage. @@ -154,21 +154,18 @@ type Storage struct { } // MustOpenStorage opens storage on the given path with the given retentionMsecs. -func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySeries int) *Storage { +func MustOpenStorage(path string, retention time.Duration, maxHourlySeries, maxDailySeries int) *Storage { path, err := filepath.Abs(path) if err != nil { logger.Panicf("FATAL: cannot determine absolute path for %q: %s", path, err) } - if retentionMsecs <= 0 { - retentionMsecs = maxRetentionMsecs - } - if retentionMsecs > maxRetentionMsecs { - retentionMsecs = maxRetentionMsecs + if retention <= 0 || retention > retentionMax { + retention = retentionMax } s := &Storage{ path: path, cachePath: filepath.Join(path, cacheDirname), - retentionMsecs: retentionMsecs, + retentionMsecs: retention.Milliseconds(), stop: make(chan struct{}), } fs.MustMkdirIfNotExist(path) @@ -246,7 +243,8 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail // Initialize nextRotationTimestamp nowSecs := time.Now().UnixNano() / 1e9 - nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionMsecs/1000, retentionTimezoneOffsetSecs) + retentionSecs := retention.Milliseconds() / 1000 // not .Seconds() because unnecessary float64 conversion + nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionSecs, retentionTimezoneOffsetSecs) atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp) // Load nextDayMetricIDs cache diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index a098fc72a..d2bfe3c71 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -478,15 +478,15 @@ func TestStorageOpenClose(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" - retentionMsecs := int64(10 * msecsPerMonth) - s := MustOpenStorage(path, retentionMsecs, 0, 0) + retention := 10 * retentionMonth + s := MustOpenStorage(path, retention, 0, 0) t.Run("serial", func(t *testing.T) { for i := 0; i < 3; i++ { if err := testStorageRandTimestamps(s); err != nil { t.Fatalf("error on iteration %d: %s", i, err) } s.MustClose() - s = MustOpenStorage(path, retentionMsecs, 0, 0) + s = MustOpenStorage(path, retention, 0, 0) } }) t.Run("concurrent", func(t *testing.T) { @@ -936,8 +936,8 @@ func testStorageRegisterMetricNames(s *Storage) error { func TestStorageAddRowsSerial(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageAddRowsSerial" - retentionMsecs := int64(msecsPerMonth * 10) - s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) + retention := 10 * retentionMonth + s := MustOpenStorage(path, retention, 1e5, 1e5) if err := testStorageAddRows(rng, s); err != nil { t.Fatalf("unexpected error: %s", err) } @@ -949,8 +949,8 @@ func TestStorageAddRowsSerial(t *testing.T) { func TestStorageAddRowsConcurrent(t *testing.T) { path := "TestStorageAddRowsConcurrent" - retentionMsecs := int64(msecsPerMonth * 10) - s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) + retention := 10 * retentionMonth + s := MustOpenStorage(path, retention, 1e5, 1e5) ch := make(chan error, 3) for i := 0; i < cap(ch); i++ { go func(n int) { @@ -1164,8 +1164,8 @@ func testStorageAddMetrics(s *Storage, workerNum int) error { func TestStorageDeleteStaleSnapshots(t *testing.T) { rng := rand.New(rand.NewSource(1)) path := "TestStorageDeleteStaleSnapshots" - retentionMsecs := int64(msecsPerMonth * 10) - s := MustOpenStorage(path, retentionMsecs, 1e5, 1e5) + retention := 10 * retentionMonth + s := MustOpenStorage(path, retention, 1e5, 1e5) const rowsPerAdd = 1e3 const addsCount = 10 maxTimestamp := timestampFromTime(time.Now()) diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index c664a55fe..49b2881ee 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -7,7 +7,7 @@ import ( func TestTableOpenClose(t *testing.T) { const path = "TestTableOpenClose" - const retentionMsecs = 123 * msecsPerMonth + const retention = 123 * retentionMonth if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) @@ -18,7 +18,7 @@ func TestTableOpenClose(t *testing.T) { // Create a new table strg := newTestStorage() - strg.retentionMsecs = retentionMsecs + strg.retentionMsecs = retention.Milliseconds() tb := mustOpenTable(path, strg) // Close it