diff --git a/CHANGELOG.md b/CHANGELOG.md index 7deeeafad..e3671e5dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ # tip +* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now. + See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173 * FEATURE: optimize more cases according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization . Now the following cases are optimized too: * `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}` * `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}` diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index aa3235e38..8c0209a77 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -22,8 +23,8 @@ import ( ) var ( + retentionPeriod = flagutil.NewDuration("retentionPeriod", 1, "Data with timestamps outside the retentionPeriod is automatically deleted") httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections") - retentionPeriod = flag.Int("retentionPeriod", 1, "Retention period in months") storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data") vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") @@ -53,11 +54,11 @@ func main() { storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) - logger.Infof("opening storage at %q with retention period %d months", *storageDataPath, *retentionPeriod) + logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod) startTime := time.Now() - strg, err := storage.OpenStorage(*storageDataPath, *retentionPeriod) + strg, err := storage.OpenStorage(*storageDataPath, retentionPeriod.Msecs) if err != nil { - logger.Fatalf("cannot open a storage at %s with retention period %d months: %s", *storageDataPath, *retentionPeriod, err) + logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *storageDataPath, retentionPeriod, err) } var m storage.Metrics diff --git a/docs/Quick-Start.md b/docs/Quick-Start.md index 80f1218e8..05d52ebea 100644 --- a/docs/Quick-Start.md +++ b/docs/Quick-Start.md @@ -8,7 +8,7 @@ and their default values. Default flag values should fit the majoirty of cases. The minimum required flags to configure are: * `-storageDataPath` - path to directory where VictoriaMetrics stores all the data. - * `-retentionPeriod` - data retention in months. + * `-retentionPeriod` - data retention. For instance: diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 81b10537f..16b85aa26 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi The following command-line flags are used the most: * `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory. -* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month. +* `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details. Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values. @@ -1048,6 +1048,7 @@ The de-duplication reduces disk space usage if multiple identically configured P write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical `external_labels` section in their configs, so they write data to the same time series. + ### Retention Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means @@ -1059,6 +1060,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower value than before then data outside the configured period will be eventually deleted. +VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days. +Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282). + + ### Multiple retentions Just start multiple VictoriaMetrics instances with distinct values for the following flags: diff --git a/lib/flagutil/duration.go b/lib/flagutil/duration.go new file mode 100644 index 000000000..8dd920f67 --- /dev/null +++ b/lib/flagutil/duration.go @@ -0,0 +1,69 @@ +package flagutil + +import ( + "flag" + "fmt" + "strconv" + "strings" + + "github.com/VictoriaMetrics/metricsql" +) + +// NewDuration returns new `duration` flag with the given name, defaultValue and description. +// +// DefaultValue is in months. +func NewDuration(name string, defaultValue float64, description string) *Duration { + description += "\nThe following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months" + d := Duration{ + Msecs: int64(defaultValue * msecsPerMonth), + valueString: fmt.Sprintf("%g", defaultValue), + } + flag.Var(&d, name, description) + return &d +} + +// Duration is a flag for holding duration. +type Duration struct { + // Msecs contains parsed duration in milliseconds. + Msecs int64 + + valueString string +} + +// String implements flag.Value interface +func (d *Duration) String() string { + return d.valueString +} + +// Set implements flag.Value interface +func (d *Duration) Set(value string) error { + // An attempt to parse value in months. + months, err := strconv.ParseFloat(value, 64) + if err == nil { + if months > maxMonths { + return fmt.Errorf("duration months must be smaller than %d; got %g", maxMonths, months) + } + if months < 0 { + return fmt.Errorf("duration months cannot be negative; got %g", months) + } + d.Msecs = int64(months * msecsPerMonth) + d.valueString = value + return nil + } + // Parse duration. + value = strings.ToLower(value) + if strings.HasSuffix(value, "m") { + return fmt.Errorf("duration in months must be set without `m` suffix due to ambiguity with duration in minutes; got %s", value) + } + msecs, err := metricsql.PositiveDurationValue(value, 0) + if err != nil { + return err + } + d.Msecs = msecs + d.valueString = value + return nil +} + +const maxMonths = 12 * 100 + +const msecsPerMonth = 31 * 24 * 3600 * 1000 diff --git a/lib/flagutil/duration_test.go b/lib/flagutil/duration_test.go new file mode 100644 index 000000000..cdb1b5945 --- /dev/null +++ b/lib/flagutil/duration_test.go @@ -0,0 +1,57 @@ +package flagutil + +import ( + "strings" + "testing" +) + +func TestDurationSetFailure(t *testing.T) { + f := func(value string) { + t.Helper() + var d Duration + if err := d.Set(value); err == nil { + t.Fatalf("expecting non-nil error in d.Set(%q)", value) + } + } + f("") + f("foobar") + f("5foobar") + f("ah") + f("134xd") + f("2.43sdfw") + + // Too big value in months + f("12345") + + // Negative duration + f("-1") + f("-34h") + + // Duration in minutes is confused with duration in months + f("1m") +} + +func TestDurationSetSuccess(t *testing.T) { + f := func(value string, expectedMsecs int64) { + t.Helper() + var d Duration + if err := d.Set(value); err != nil { + t.Fatalf("unexpected error in d.Set(%q): %s", value, err) + } + if d.Msecs != expectedMsecs { + t.Fatalf("unexpected result; got %d; want %d", d.Msecs, expectedMsecs) + } + valueString := d.String() + valueExpected := strings.ToLower(value) + if valueString != valueExpected { + t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected) + } + } + f("0", 0) + f("1", msecsPerMonth) + f("123.456", 123.456*msecsPerMonth) + f("1h", 3600*1000) + f("1.5d", 1.5*24*3600*1000) + f("2.3W", 2.3*7*24*3600*1000) + f("0.25y", 0.25*365*24*3600*1000) +} diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 6f3463f91..6a1845ce9 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -15,12 +15,12 @@ import ( // // rowsMerged is atomically updated with the number of merged rows during the merge. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, - dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { ph.Reset() bsm := bsmPool.Get().(*blockStreamMerger) bsm.Init(bsrs) - err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) + err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) bsm.reset() bsmPool.Put(bsm) bsw.MustClose() @@ -39,7 +39,7 @@ var bsmPool = &sync.Pool{ var errForciblyStopped = fmt.Errorf("forcibly stopped") func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, - dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { + dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error { // Search for the first block to merge var pendingBlock *Block for bsm.NextBlock() { @@ -53,6 +53,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue } + if bsm.Block.bh.MaxTimestamp < retentionDeadline { + // Skip blocks out of the given retention. + *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + continue + } pendingBlock = getBlock() pendingBlock.CopyFrom(bsm.Block) break @@ -75,6 +80,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc *rowsDeleted += uint64(bsm.Block.bh.RowsCount) continue } + if bsm.Block.bh.MaxTimestamp < retentionDeadline { + // skip blocks out of the given retention. + *rowsDeleted += uint64(bsm.Block.bh.RowsCount) + continue + } // Verify whether pendingBlock may be merged with bsm.Block (the current block). if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID { diff --git a/lib/storage/merge_test.go b/lib/storage/merge_test.go index 215af357d..dd12ccaff 100644 --- a/lib/storage/merge_test.go +++ b/lib/storage/merge_test.go @@ -365,7 +365,7 @@ func TestMergeForciblyStop(t *testing.T) { ch := make(chan struct{}) var rowsMerged, rowsDeleted uint64 close(ch) - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) } if rowsMerged != 0 { @@ -385,7 +385,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc bsw.InitFromInmemoryPart(&mp) var rowsMerged, rowsDeleted uint64 - if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { t.Fatalf("unexpected error in mergeBlockStreams: %s", err) } diff --git a/lib/storage/merge_timing_test.go b/lib/storage/merge_timing_test.go index 73a03ea29..38d2f5ba6 100644 --- a/lib/storage/merge_timing_test.go +++ b/lib/storage/merge_timing_test.go @@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i } mpOut.Reset() bsw.InitFromInmemoryPart(&mpOut) - if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { + if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil { panic(fmt.Errorf("cannot merge block streams: %w", err)) } } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index f159c0b98..ad5aaee2e 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -134,6 +134,10 @@ type partition struct { // The callack that returns deleted metric ids which must be skipped during merge. getDeletedMetricIDs func() *uint64set.Set + // data retention in milliseconds. + // Used for deleting data outside the retention during background merge. + retentionMsecs int64 + // Name is the name of the partition in the form YYYY_MM. name string @@ -206,7 +210,7 @@ func (pw *partWrapper) decRef() { // createPartition creates new partition for the given timestamp and the given paths // to small and big partitions. -func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { +func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { name := timestampToPartitionName(timestamp) smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name @@ -219,7 +223,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt.tr.fromPartitionTimestamp(timestamp) pt.startMergeWorkers() pt.startRawRowsFlusher() @@ -241,7 +245,7 @@ func (pt *partition) Drop() { } // openPartition opens the existing partition from the given paths. -func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { +func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) { smallPartsPath = filepath.Clean(smallPartsPath) bigPartsPath = filepath.Clean(bigPartsPath) @@ -265,7 +269,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) } - pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) pt.smallParts = smallParts pt.bigParts = bigParts if err := pt.tr.fromPartitionName(name); err != nil { @@ -278,13 +282,14 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func return pt, nil } -func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { +func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition { p := &partition{ name: name, smallPartsPath: smallPartsPath, bigPartsPath: bigPartsPath, getDeletedMetricIDs: getDeletedMetricIDs, + retentionMsecs: retentionMsecs, mergeIdx: uint64(time.Now().UnixNano()), stopCh: make(chan struct{}), @@ -1129,7 +1134,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.activeSmallMerges, 1) } - err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) + retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted) if isBigPart { atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) } else { diff --git a/lib/storage/partition_search_test.go b/lib/storage/partition_search_test.go index 9342a9cbc..42c90fe83 100644 --- a/lib/storage/partition_search_test.go +++ b/lib/storage/partition_search_test.go @@ -167,7 +167,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma }) // Create partition from rowss and test search on it. - pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs) + retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000 + pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot create partition: %s", err) } @@ -191,7 +192,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma pt.MustClose() // Open the created partition and test search on it. - pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs) + pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open partition: %s", err) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 16bc2133b..95410313e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -27,7 +27,10 @@ import ( "github.com/VictoriaMetrics/fastcache" ) -const maxRetentionMonths = 12 * 100 +const ( + msecsPerMonth = 31 * 24 * 3600 * 1000 + maxRetentionMsecs = 100 * 12 * msecsPerMonth +) // Storage represents TSDB storage. type Storage struct { @@ -117,23 +120,20 @@ type accountProjectKey struct { ProjectID uint32 } -// OpenStorage opens storage on the given path with the given number of retention months. -func OpenStorage(path string, retentionMonths int) (*Storage, error) { - if retentionMonths > maxRetentionMonths { - return nil, fmt.Errorf("too big retentionMonths=%d; cannot exceed %d", retentionMonths, maxRetentionMonths) - } - if retentionMonths <= 0 { - retentionMonths = maxRetentionMonths - } +// OpenStorage opens storage on the given path with the given retentionMsecs. +func OpenStorage(path string, retentionMsecs int64) (*Storage, error) { path, err := filepath.Abs(path) if err != nil { return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) } - + if retentionMsecs <= 0 { + retentionMsecs = maxRetentionMsecs + } + retentionMonths := (retentionMsecs + (msecsPerMonth - 1)) / msecsPerMonth s := &Storage{ path: path, cachePath: path + "/cache", - retentionMonths: retentionMonths, + retentionMonths: int(retentionMonths), stop: make(chan struct{}), } @@ -188,7 +188,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) { // Load data tablePath := path + "/data" - tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs) + tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs) if err != nil { s.idb().MustClose() return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 60caf19a6..73c744d70 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -389,8 +389,8 @@ func TestStorageOpenMultipleTimes(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) { path := "TestStorageRandTimestamps" - retentionMonths := 60 - s, err := OpenStorage(path, retentionMonths) + retentionMsecs := int64(60 * msecsPerMonth) + s, err := OpenStorage(path, retentionMsecs) if err != nil { t.Fatalf("cannot open storage: %s", err) } @@ -400,7 +400,7 @@ func TestStorageRandTimestamps(t *testing.T) { t.Fatal(err) } s.MustClose() - s, err = OpenStorage(path, retentionMonths) + s, err = OpenStorage(path, retentionMsecs) } }) t.Run("concurrent", func(t *testing.T) { diff --git a/lib/storage/table.go b/lib/storage/table.go index ed2780361..faea8c1a0 100644 --- a/lib/storage/table.go +++ b/lib/storage/table.go @@ -22,6 +22,7 @@ type table struct { bigPartitionsPath string getDeletedMetricIDs func() *uint64set.Set + retentionMsecs int64 ptws []*partitionWrapper ptwsLock sync.Mutex @@ -30,8 +31,7 @@ type table struct { stop chan struct{} - retentionMilliseconds int64 - retentionWatcherWG sync.WaitGroup + retentionWatcherWG sync.WaitGroup } // partitionWrapper provides refcounting mechanism for the partition. @@ -77,12 +77,12 @@ func (ptw *partitionWrapper) scheduleToDrop() { atomic.AddUint64(&ptw.mustDrop, 1) } -// openTable opens a table on the given path with the given retentionMonths. +// openTable opens a table on the given path with the given retentionMsecs. // // The table is created if it doesn't exist. // -// Data older than the retentionMonths may be dropped at any time. -func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) { +// Data older than the retentionMsecs may be dropped at any time. +func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) { path = filepath.Clean(path) // Create a directory for the table if it doesn't exist yet. @@ -115,7 +115,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin } // Open partitions. - pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs) + pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs) if err != nil { return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) } @@ -125,6 +125,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin smallPartitionsPath: smallPartitionsPath, bigPartitionsPath: bigPartitionsPath, getDeletedMetricIDs: getDeletedMetricIDs, + retentionMsecs: retentionMsecs, flockF: flockF, @@ -133,11 +134,6 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin for _, pt := range pts { tb.addPartitionNolock(pt) } - if retentionMonths <= 0 || retentionMonths > maxRetentionMonths { - retentionMonths = maxRetentionMonths - } - tb.retentionMilliseconds = int64(retentionMonths) * 31 * 24 * 3600 * 1e3 - tb.startRetentionWatcher() return tb, nil } @@ -357,7 +353,7 @@ func (tb *table) AddRows(rows []rawRow) error { continue } - pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs) + pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs) if err != nil { errors = append(errors, err) continue @@ -376,7 +372,7 @@ func (tb *table) AddRows(rows []rawRow) error { func (tb *table) getMinMaxTimestamps() (int64, int64) { now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMilliseconds + minTimestamp := now - tb.retentionMsecs maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) if minTimestamp < 0 { // Negative timestamps aren't supported by the storage. @@ -406,7 +402,7 @@ func (tb *table) retentionWatcher() { case <-ticker.C: } - minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMilliseconds + minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs var ptwsDrop []*partitionWrapper tb.ptwsLock.Lock() dst := tb.ptws[:0] @@ -457,7 +453,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) { } } -func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) { +func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) { // Certain partition directories in either `big` or `small` dir may be missing // after restoring from backup. So populate partition names from both dirs. ptNames := make(map[string]bool) @@ -471,7 +467,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet for ptName := range ptNames { smallPartsPath := smallPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName - pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs) + pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs) if err != nil { mustClosePartitions(pts) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) diff --git a/lib/storage/table_search.go b/lib/storage/table_search.go index 50b5ea35b..f81dc2136 100644 --- a/lib/storage/table_search.go +++ b/lib/storage/table_search.go @@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) { // Adjust tr.MinTimestamp, so it doesn't obtain data older // than the tb retention. now := int64(fasttime.UnixTimestamp() * 1000) - minTimestamp := now - tb.retentionMilliseconds + minTimestamp := now - tb.retentionMsecs if tr.MinTimestamp < minTimestamp { tr.MinTimestamp = minTimestamp } diff --git a/lib/storage/table_search_test.go b/lib/storage/table_search_test.go index d2f81e813..cda14c8b8 100644 --- a/lib/storage/table_search_test.go +++ b/lib/storage/table_search_test.go @@ -181,7 +181,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount }) // Create a table from rowss and test search on it. - tb, err := openTable("./test-table", -1, nilGetDeletedMetricIDs) + tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { t.Fatalf("cannot create table: %s", err) } @@ -202,7 +202,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount tb.MustClose() // Open the created table and test search on it. - tb, err = openTable("./test-table", -1, nilGetDeletedMetricIDs) + tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { t.Fatalf("cannot open table: %s", err) } diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 42b450681..c45c9b334 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -47,7 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createdBenchTables[path] = true } - tb, err := openTable(path, -1, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cnanot open table %q: %s", path, err) } @@ -70,7 +70,7 @@ var createdBenchTables = make(map[string]bool) func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { b.Helper() - tb, err := openTable(path, -1, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", path, err) } diff --git a/lib/storage/table_test.go b/lib/storage/table_test.go index daeef21ee..68a76dbc9 100644 --- a/lib/storage/table_test.go +++ b/lib/storage/table_test.go @@ -7,7 +7,7 @@ import ( func TestTableOpenClose(t *testing.T) { const path = "TestTableOpenClose" - const retentionMonths = 123 + const retentionMsecs = 123 * msecsPerMonth if err := os.RemoveAll(path); err != nil { t.Fatalf("cannot remove %q: %s", path, err) @@ -17,7 +17,7 @@ func TestTableOpenClose(t *testing.T) { }() // Create a new table - tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot create new table: %s", err) } @@ -27,7 +27,7 @@ func TestTableOpenClose(t *testing.T) { // Re-open created table multiple times. for i := 0; i < 10; i++ { - tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open created table: %s", err) } @@ -37,20 +37,20 @@ func TestTableOpenClose(t *testing.T) { func TestTableOpenMultipleTimes(t *testing.T) { const path = "TestTableOpenMultipleTimes" - const retentionMonths = 123 + const retentionMsecs = 123 * msecsPerMonth defer func() { _ = os.RemoveAll(path) }() - tb1, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err != nil { t.Fatalf("cannot open table the first time: %s", err) } defer tb1.MustClose() for i := 0; i < 10; i++ { - tb2, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) + tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs) if err == nil { tb2.MustClose() t.Fatalf("expecting non-nil error when opening already opened table") diff --git a/lib/storage/table_timing_test.go b/lib/storage/table_timing_test.go index 2a20add41..ed1ca8bde 100644 --- a/lib/storage/table_timing_test.go +++ b/lib/storage/table_timing_test.go @@ -45,7 +45,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { b.SetBytes(int64(rowsCountExpected)) tablePath := "./benchmarkTableAddRows" for i := 0; i < b.N; i++ { - tb, err := openTable(tablePath, -1, nilGetDeletedMetricIDs) + tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) } @@ -93,7 +93,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) { tb.MustClose() // Open the table from files and verify the rows count on it - tb, err = openTable(tablePath, -1, nilGetDeletedMetricIDs) + tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs) if err != nil { b.Fatalf("cannot open table %q: %s", tablePath, err) }