app/vmstorage: support for -retentionPeriod smaller than one month

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/17
This commit is contained in:
Aliaksandr Valialkin 2020-10-20 14:29:26 +03:00
parent 920300643a
commit 5bfd4e6218
20 changed files with 219 additions and 67 deletions

View file

@ -1,6 +1,8 @@
# tip
* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173
* FEATURE: optimize more cases according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization . Now the following cases are optimized too:
* `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}`
* `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}`

View file

@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi
The following command-line flags are used the most:
* `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory.
* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month.
* `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details.
Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values.
@ -1048,6 +1048,7 @@ The de-duplication reduces disk space usage if multiple identically configured P
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series.
### Retention
Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means
@ -1059,6 +1060,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma
It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower
value than before then data outside the configured period will be eventually deleted.
VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days.
Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
### Multiple retentions
Just start multiple VictoriaMetrics instances with distinct values for the following flags:

View file

@ -10,6 +10,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -19,7 +20,7 @@ import (
)
var (
retentionPeriod = flag.Int("retentionPeriod", 1, "Retention period in months")
retentionPeriod = flagutil.NewDuration("retentionPeriod", 1, "Data with timestamps outside the retentionPeriod is automatically deleted")
snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages")
forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages")
@ -44,12 +45,12 @@ func CheckTimeRange(tr storage.TimeRange) error {
if !*denyQueriesOutsideRetention {
return nil
}
minAllowedTimestamp := (int64(fasttime.UnixTimestamp()) - int64(*retentionPeriod)*3600*24*30) * 1000
minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionPeriod.Msecs
if tr.MinTimestamp > minAllowedTimestamp {
return nil
}
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("the given time range %s is outside the allowed retention of %d months according to -denyQueriesOutsideRetention", &tr, *retentionPeriod),
Err: fmt.Errorf("the given time range %s is outside the allowed -retentionPeriod=%s according to -denyQueriesOutsideRetention", &tr, retentionPeriod),
StatusCode: http.StatusServiceUnavailable,
}
}
@ -72,12 +73,12 @@ func InitWithoutMetrics() {
storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
logger.Infof("opening storage at %q with retention period %d months", *DataPath, *retentionPeriod)
logger.Infof("opening storage at %q with -retentionPeriod=%s", *DataPath, retentionPeriod)
startTime := time.Now()
WG = syncwg.WaitGroup{}
strg, err := storage.OpenStorage(*DataPath, *retentionPeriod)
strg, err := storage.OpenStorage(*DataPath, retentionPeriod.Msecs)
if err != nil {
logger.Fatalf("cannot open a storage at %s with retention period %d months: %s", *DataPath, *retentionPeriod, err)
logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *DataPath, retentionPeriod, err)
}
Storage = strg

View file

@ -8,7 +8,7 @@
and their default values. Default flag values should fit the majoirty of cases. The minimum required flags to configure are:
* `-storageDataPath` - path to directory where VictoriaMetrics stores all the data.
* `-retentionPeriod` - data retention in months.
* `-retentionPeriod` - data retention.
For instance:

View file

@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi
The following command-line flags are used the most:
* `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory.
* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month.
* `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details.
Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values.
@ -1048,6 +1048,7 @@ The de-duplication reduces disk space usage if multiple identically configured P
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series.
### Retention
Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means
@ -1059,6 +1060,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma
It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower
value than before then data outside the configured period will be eventually deleted.
VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days.
Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
### Multiple retentions
Just start multiple VictoriaMetrics instances with distinct values for the following flags:

69
lib/flagutil/duration.go Normal file
View file

@ -0,0 +1,69 @@
package flagutil
import (
"flag"
"fmt"
"strconv"
"strings"
"github.com/VictoriaMetrics/metricsql"
)
// NewDuration returns new `duration` flag with the given name, defaultValue and description.
//
// DefaultValue is in months.
func NewDuration(name string, defaultValue float64, description string) *Duration {
description += "\nThe following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months"
d := Duration{
Msecs: int64(defaultValue * msecsPerMonth),
valueString: fmt.Sprintf("%g", defaultValue),
}
flag.Var(&d, name, description)
return &d
}
// Duration is a flag for holding duration.
type Duration struct {
// Msecs contains parsed duration in milliseconds.
Msecs int64
valueString string
}
// String implements flag.Value interface
func (d *Duration) String() string {
return d.valueString
}
// Set implements flag.Value interface
func (d *Duration) Set(value string) error {
// An attempt to parse value in months.
months, err := strconv.ParseFloat(value, 64)
if err == nil {
if months > maxMonths {
return fmt.Errorf("duration months must be smaller than %d; got %g", maxMonths, months)
}
if months < 0 {
return fmt.Errorf("duration months cannot be negative; got %g", months)
}
d.Msecs = int64(months * msecsPerMonth)
d.valueString = value
return nil
}
// Parse duration.
value = strings.ToLower(value)
if strings.HasSuffix(value, "m") {
return fmt.Errorf("duration in months must be set without `m` suffix due to ambiguity with duration in minutes; got %s", value)
}
msecs, err := metricsql.PositiveDurationValue(value, 0)
if err != nil {
return err
}
d.Msecs = msecs
d.valueString = value
return nil
}
const maxMonths = 12 * 100
const msecsPerMonth = 31 * 24 * 3600 * 1000

View file

@ -0,0 +1,57 @@
package flagutil
import (
"strings"
"testing"
)
func TestDurationSetFailure(t *testing.T) {
f := func(value string) {
t.Helper()
var d Duration
if err := d.Set(value); err == nil {
t.Fatalf("expecting non-nil error in d.Set(%q)", value)
}
}
f("")
f("foobar")
f("5foobar")
f("ah")
f("134xd")
f("2.43sdfw")
// Too big value in months
f("12345")
// Negative duration
f("-1")
f("-34h")
// Duration in minutes is confused with duration in months
f("1m")
}
func TestDurationSetSuccess(t *testing.T) {
f := func(value string, expectedMsecs int64) {
t.Helper()
var d Duration
if err := d.Set(value); err != nil {
t.Fatalf("unexpected error in d.Set(%q): %s", value, err)
}
if d.Msecs != expectedMsecs {
t.Fatalf("unexpected result; got %d; want %d", d.Msecs, expectedMsecs)
}
valueString := d.String()
valueExpected := strings.ToLower(value)
if valueString != valueExpected {
t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected)
}
}
f("0", 0)
f("1", msecsPerMonth)
f("123.456", 123.456*msecsPerMonth)
f("1h", 3600*1000)
f("1.5d", 1.5*24*3600*1000)
f("2.3W", 2.3*7*24*3600*1000)
f("0.25y", 0.25*365*24*3600*1000)
}

View file

@ -15,12 +15,12 @@ import (
//
// rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
bsm.reset()
bsmPool.Put(bsm)
bsw.MustClose()
@ -39,7 +39,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error {
dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
// Search for the first block to merge
var pendingBlock *Block
for bsm.NextBlock() {
@ -53,6 +53,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
pendingBlock = getBlock()
pendingBlock.CopyFrom(bsm.Block)
break
@ -75,6 +80,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
// Verify whether pendingBlock may be merged with bsm.Block (the current block).
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID {

View file

@ -365,7 +365,7 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{})
var rowsMerged, rowsDeleted uint64
close(ch)
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
}
if rowsMerged != 0 {
@ -385,7 +385,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
bsw.InitFromInmemoryPart(&mp)
var rowsMerged, rowsDeleted uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
}

View file

@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
}
mpOut.Reset()
bsw.InitFromInmemoryPart(&mpOut)
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil {
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
panic(fmt.Errorf("cannot merge block streams: %w", err))
}
}

View file

@ -134,6 +134,10 @@ type partition struct {
// The callack that returns deleted metric ids which must be skipped during merge.
getDeletedMetricIDs func() *uint64set.Set
// data retention in milliseconds.
// Used for deleting data outside the retention during background merge.
retentionMsecs int64
// Name is the name of the partition in the form YYYY_MM.
name string
@ -206,7 +210,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -219,7 +223,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs)
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers()
pt.startRawRowsFlusher()
@ -241,7 +245,7 @@ func (pt *partition) Drop() {
}
// openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) {
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath)
@ -265,7 +269,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
}
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs)
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
pt.smallParts = smallParts
pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil {
@ -278,13 +282,14 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil
}
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition {
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition {
p := &partition{
name: name,
smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath,
getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs,
mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}),
@ -1129,7 +1134,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
atomic.AddUint64(&pt.smallMergesCount, 1)
atomic.AddUint64(&pt.activeSmallMerges, 1)
}
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted)
retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
if isBigPart {
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
} else {

View file

@ -167,7 +167,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
})
// Create partition from rowss and test search on it.
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs)
retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs)
if err != nil {
t.Fatalf("cannot create partition: %s", err)
}
@ -191,7 +192,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose()
// Open the created partition and test search on it.
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs)
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil {
t.Fatalf("cannot open partition: %s", err)
}

View file

@ -27,7 +27,10 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
const maxRetentionMonths = 12 * 100
const (
msecsPerMonth = 31 * 24 * 3600 * 1000
maxRetentionMsecs = 100 * 12 * msecsPerMonth
)
// Storage represents TSDB storage.
type Storage struct {
@ -106,23 +109,20 @@ type Storage struct {
snapshotLock sync.Mutex
}
// OpenStorage opens storage on the given path with the given number of retention months.
func OpenStorage(path string, retentionMonths int) (*Storage, error) {
if retentionMonths > maxRetentionMonths {
return nil, fmt.Errorf("too big retentionMonths=%d; cannot exceed %d", retentionMonths, maxRetentionMonths)
}
if retentionMonths <= 0 {
retentionMonths = maxRetentionMonths
}
// OpenStorage opens storage on the given path with the given retentionMsecs.
func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
path, err := filepath.Abs(path)
if err != nil {
return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
}
if retentionMsecs <= 0 {
retentionMsecs = maxRetentionMsecs
}
retentionMonths := (retentionMsecs + (msecsPerMonth - 1)) / msecsPerMonth
s := &Storage{
path: path,
cachePath: path + "/cache",
retentionMonths: retentionMonths,
retentionMonths: int(retentionMonths),
stop: make(chan struct{}),
}
@ -178,7 +178,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
// Load data
tablePath := path + "/data"
tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs)
tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs)
if err != nil {
s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)

View file

@ -353,8 +353,8 @@ func TestStorageOpenMultipleTimes(t *testing.T) {
func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps"
retentionMonths := 60
s, err := OpenStorage(path, retentionMonths)
retentionMsecs := int64(60 * msecsPerMonth)
s, err := OpenStorage(path, retentionMsecs)
if err != nil {
t.Fatalf("cannot open storage: %s", err)
}
@ -364,7 +364,7 @@ func TestStorageRandTimestamps(t *testing.T) {
t.Fatal(err)
}
s.MustClose()
s, err = OpenStorage(path, retentionMonths)
s, err = OpenStorage(path, retentionMsecs)
}
})
t.Run("concurrent", func(t *testing.T) {

View file

@ -22,6 +22,7 @@ type table struct {
bigPartitionsPath string
getDeletedMetricIDs func() *uint64set.Set
retentionMsecs int64
ptws []*partitionWrapper
ptwsLock sync.Mutex
@ -30,8 +31,7 @@ type table struct {
stop chan struct{}
retentionMilliseconds int64
retentionWatcherWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
}
// partitionWrapper provides refcounting mechanism for the partition.
@ -77,12 +77,12 @@ func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1)
}
// openTable opens a table on the given path with the given retentionMonths.
// openTable opens a table on the given path with the given retentionMsecs.
//
// The table is created if it doesn't exist.
//
// Data older than the retentionMonths may be dropped at any time.
func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) {
// Data older than the retentionMsecs may be dropped at any time.
func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) {
path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet.
@ -115,7 +115,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
}
// Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs)
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs)
if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
}
@ -125,6 +125,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
smallPartitionsPath: smallPartitionsPath,
bigPartitionsPath: bigPartitionsPath,
getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs,
flockF: flockF,
@ -133,11 +134,6 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
for _, pt := range pts {
tb.addPartitionNolock(pt)
}
if retentionMonths <= 0 || retentionMonths > maxRetentionMonths {
retentionMonths = maxRetentionMonths
}
tb.retentionMilliseconds = int64(retentionMonths) * 31 * 24 * 3600 * 1e3
tb.startRetentionWatcher()
return tb, nil
}
@ -357,7 +353,7 @@ func (tb *table) AddRows(rows []rawRow) error {
continue
}
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs)
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs)
if err != nil {
errors = append(errors, err)
continue
@ -376,7 +372,7 @@ func (tb *table) AddRows(rows []rawRow) error {
func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMilliseconds
minTimestamp := now - tb.retentionMsecs
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage.
@ -406,7 +402,7 @@ func (tb *table) retentionWatcher() {
case <-ticker.C:
}
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMilliseconds
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs
var ptwsDrop []*partitionWrapper
tb.ptwsLock.Lock()
dst := tb.ptws[:0]
@ -457,7 +453,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
}
}
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) {
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) {
// Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool)
@ -471,7 +467,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs)
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
if err != nil {
mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)

View file

@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
// Adjust tr.MinTimestamp, so it doesn't obtain data older
// than the tb retention.
now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMilliseconds
minTimestamp := now - tb.retentionMsecs
if tr.MinTimestamp < minTimestamp {
tr.MinTimestamp = minTimestamp
}

View file

@ -181,7 +181,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
})
// Create a table from rowss and test search on it.
tb, err := openTable("./test-table", -1, nilGetDeletedMetricIDs)
tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
t.Fatalf("cannot create table: %s", err)
}
@ -202,7 +202,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
tb.MustClose()
// Open the created table and test search on it.
tb, err = openTable("./test-table", -1, nilGetDeletedMetricIDs)
tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
t.Fatalf("cannot open table: %s", err)
}

View file

@ -47,7 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
createdBenchTables[path] = true
}
tb, err := openTable(path, -1, nilGetDeletedMetricIDs)
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err)
}
@ -70,7 +70,7 @@ var createdBenchTables = make(map[string]bool)
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
b.Helper()
tb, err := openTable(path, -1, nilGetDeletedMetricIDs)
tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
b.Fatalf("cannot open table %q: %s", path, err)
}

View file

@ -7,7 +7,7 @@ import (
func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose"
const retentionMonths = 123
const retentionMsecs = 123 * msecsPerMonth
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
@ -17,7 +17,7 @@ func TestTableOpenClose(t *testing.T) {
}()
// Create a new table
tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs)
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil {
t.Fatalf("cannot create new table: %s", err)
}
@ -27,7 +27,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times.
for i := 0; i < 10; i++ {
tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs)
tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil {
t.Fatalf("cannot open created table: %s", err)
}
@ -37,20 +37,20 @@ func TestTableOpenClose(t *testing.T) {
func TestTableOpenMultipleTimes(t *testing.T) {
const path = "TestTableOpenMultipleTimes"
const retentionMonths = 123
const retentionMsecs = 123 * msecsPerMonth
defer func() {
_ = os.RemoveAll(path)
}()
tb1, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs)
tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil {
t.Fatalf("cannot open table the first time: %s", err)
}
defer tb1.MustClose()
for i := 0; i < 10; i++ {
tb2, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs)
tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err == nil {
tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table")

View file

@ -45,7 +45,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
b.SetBytes(int64(rowsCountExpected))
tablePath := "./benchmarkTableAddRows"
for i := 0; i < b.N; i++ {
tb, err := openTable(tablePath, -1, nilGetDeletedMetricIDs)
tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}
@ -93,7 +93,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose()
// Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, -1, nilGetDeletedMetricIDs)
tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err)
}