app/vmstorage: support for -retentionPeriod smaller than one month

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/17
This commit is contained in:
Aliaksandr Valialkin 2020-10-20 14:29:26 +03:00
parent b915bf01e4
commit 0db7c2b500
19 changed files with 211 additions and 64 deletions

View file

@ -1,6 +1,8 @@
# tip # tip
* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now.
See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/173
* FEATURE: optimize more cases according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization . Now the following cases are optimized too: * FEATURE: optimize more cases according to https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization . Now the following cases are optimized too:
* `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}` * `rollup_func(foo{filters}[d]) op bar` -> `rollup_func(foo{filters}[d]) op bar{filters}`
* `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}` * `transform_func(foo{filters}) op bar` -> `transform_func(foo{filters}) op bar{filters}`

View file

@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -22,8 +23,8 @@ import (
) )
var ( var (
retentionPeriod = flagutil.NewDuration("retentionPeriod", 1, "Data with timestamps outside the retentionPeriod is automatically deleted")
httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections") httpListenAddr = flag.String("httpListenAddr", ":8482", "Address to listen for http connections")
retentionPeriod = flag.Int("retentionPeriod", 1, "Retention period in months")
storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data") storageDataPath = flag.String("storageDataPath", "vmstorage-data", "Path to storage data")
vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services") vminsertAddr = flag.String("vminsertAddr", ":8400", "TCP address to accept connections from vminsert services")
vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services")
@ -53,11 +54,11 @@ func main() {
storage.SetBigMergeWorkersCount(*bigMergeConcurrency) storage.SetBigMergeWorkersCount(*bigMergeConcurrency)
storage.SetSmallMergeWorkersCount(*smallMergeConcurrency) storage.SetSmallMergeWorkersCount(*smallMergeConcurrency)
logger.Infof("opening storage at %q with retention period %d months", *storageDataPath, *retentionPeriod) logger.Infof("opening storage at %q with -retentionPeriod=%s", *storageDataPath, retentionPeriod)
startTime := time.Now() startTime := time.Now()
strg, err := storage.OpenStorage(*storageDataPath, *retentionPeriod) strg, err := storage.OpenStorage(*storageDataPath, retentionPeriod.Msecs)
if err != nil { if err != nil {
logger.Fatalf("cannot open a storage at %s with retention period %d months: %s", *storageDataPath, *retentionPeriod, err) logger.Fatalf("cannot open a storage at %s with -retentionPeriod=%s: %s", *storageDataPath, retentionPeriod, err)
} }
var m storage.Metrics var m storage.Metrics

View file

@ -8,7 +8,7 @@
and their default values. Default flag values should fit the majoirty of cases. The minimum required flags to configure are: and their default values. Default flag values should fit the majoirty of cases. The minimum required flags to configure are:
* `-storageDataPath` - path to directory where VictoriaMetrics stores all the data. * `-storageDataPath` - path to directory where VictoriaMetrics stores all the data.
* `-retentionPeriod` - data retention in months. * `-retentionPeriod` - data retention.
For instance: For instance:

View file

@ -164,7 +164,7 @@ or [docker image](https://hub.docker.com/r/victoriametrics/victoria-metrics/) wi
The following command-line flags are used the most: The following command-line flags are used the most:
* `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory. * `-storageDataPath` - path to data directory. VictoriaMetrics stores all the data in this directory. Default path is `victoria-metrics-data` in the current working directory.
* `-retentionPeriod` - retention period in months for stored data. Older data is automatically deleted. Default period is 1 month. * `-retentionPeriod` - retention for stored data. Older data is automatically deleted. Default retention is 1 month. See [these docs](#retention) for more details.
Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values. Other flags have good enough default values, so set them only if you really need this. Pass `-help` to see all the available flags with description and default values.
@ -1048,6 +1048,7 @@ The de-duplication reduces disk space usage if multiple identically configured P
write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical write data to the same VictoriaMetrics instance. Note that these Prometheus instances must have identical
`external_labels` section in their configs, so they write data to the same time series. `external_labels` section in their configs, so they write data to the same time series.
### Retention ### Retention
Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means Retention is configured with `-retentionPeriod` command-line flag. For instance, `-retentionPeriod=3` means
@ -1059,6 +1060,10 @@ For example if `-retentionPeriod` is set to 1, data for January is deleted on Ma
It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower It is safe to extend `-retentionPeriod` on existing data. If `-retentionPeriod` is set to lower
value than before then data outside the configured period will be eventually deleted. value than before then data outside the configured period will be eventually deleted.
VictoriaMetrics supports retention smaller than 1 month. For example, `-retentionPeriod=5d` would set data retention for 5 days.
Older data is eventually deleted during [background merge](https://medium.com/@valyala/how-victoriametrics-makes-instant-snapshots-for-multi-terabyte-time-series-data-e1f3fb0e0282).
### Multiple retentions ### Multiple retentions
Just start multiple VictoriaMetrics instances with distinct values for the following flags: Just start multiple VictoriaMetrics instances with distinct values for the following flags:

69
lib/flagutil/duration.go Normal file
View file

@ -0,0 +1,69 @@
package flagutil
import (
"flag"
"fmt"
"strconv"
"strings"
"github.com/VictoriaMetrics/metricsql"
)
// NewDuration returns new `duration` flag with the given name, defaultValue and description.
//
// DefaultValue is in months.
func NewDuration(name string, defaultValue float64, description string) *Duration {
description += "\nThe following optional suffixes are supported: h (hour), d (day), w (week), y (year). If suffix isn't set, then the duration is counted in months"
d := Duration{
Msecs: int64(defaultValue * msecsPerMonth),
valueString: fmt.Sprintf("%g", defaultValue),
}
flag.Var(&d, name, description)
return &d
}
// Duration is a flag for holding duration.
type Duration struct {
// Msecs contains parsed duration in milliseconds.
Msecs int64
valueString string
}
// String implements flag.Value interface
func (d *Duration) String() string {
return d.valueString
}
// Set implements flag.Value interface
func (d *Duration) Set(value string) error {
// An attempt to parse value in months.
months, err := strconv.ParseFloat(value, 64)
if err == nil {
if months > maxMonths {
return fmt.Errorf("duration months must be smaller than %d; got %g", maxMonths, months)
}
if months < 0 {
return fmt.Errorf("duration months cannot be negative; got %g", months)
}
d.Msecs = int64(months * msecsPerMonth)
d.valueString = value
return nil
}
// Parse duration.
value = strings.ToLower(value)
if strings.HasSuffix(value, "m") {
return fmt.Errorf("duration in months must be set without `m` suffix due to ambiguity with duration in minutes; got %s", value)
}
msecs, err := metricsql.PositiveDurationValue(value, 0)
if err != nil {
return err
}
d.Msecs = msecs
d.valueString = value
return nil
}
const maxMonths = 12 * 100
const msecsPerMonth = 31 * 24 * 3600 * 1000

View file

@ -0,0 +1,57 @@
package flagutil
import (
"strings"
"testing"
)
func TestDurationSetFailure(t *testing.T) {
f := func(value string) {
t.Helper()
var d Duration
if err := d.Set(value); err == nil {
t.Fatalf("expecting non-nil error in d.Set(%q)", value)
}
}
f("")
f("foobar")
f("5foobar")
f("ah")
f("134xd")
f("2.43sdfw")
// Too big value in months
f("12345")
// Negative duration
f("-1")
f("-34h")
// Duration in minutes is confused with duration in months
f("1m")
}
func TestDurationSetSuccess(t *testing.T) {
f := func(value string, expectedMsecs int64) {
t.Helper()
var d Duration
if err := d.Set(value); err != nil {
t.Fatalf("unexpected error in d.Set(%q): %s", value, err)
}
if d.Msecs != expectedMsecs {
t.Fatalf("unexpected result; got %d; want %d", d.Msecs, expectedMsecs)
}
valueString := d.String()
valueExpected := strings.ToLower(value)
if valueString != valueExpected {
t.Fatalf("unexpected valueString; got %q; want %q", valueString, valueExpected)
}
}
f("0", 0)
f("1", msecsPerMonth)
f("123.456", 123.456*msecsPerMonth)
f("1h", 3600*1000)
f("1.5d", 1.5*24*3600*1000)
f("2.3W", 2.3*7*24*3600*1000)
f("0.25y", 0.25*365*24*3600*1000)
}

View file

@ -15,12 +15,12 @@ import (
// //
// rowsMerged is atomically updated with the number of merged rows during the merge. // rowsMerged is atomically updated with the number of merged rows during the merge.
func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{}, func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
ph.Reset() ph.Reset()
bsm := bsmPool.Get().(*blockStreamMerger) bsm := bsmPool.Get().(*blockStreamMerger)
bsm.Init(bsrs) bsm.Init(bsrs)
err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, rowsMerged, rowsDeleted) err := mergeBlockStreamsInternal(ph, bsw, bsm, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
bsm.reset() bsm.reset()
bsmPool.Put(bsm) bsmPool.Put(bsm)
bsw.MustClose() bsw.MustClose()
@ -39,7 +39,7 @@ var bsmPool = &sync.Pool{
var errForciblyStopped = fmt.Errorf("forcibly stopped") var errForciblyStopped = fmt.Errorf("forcibly stopped")
func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{}, func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *blockStreamMerger, stopCh <-chan struct{},
dmis *uint64set.Set, rowsMerged, rowsDeleted *uint64) error { dmis *uint64set.Set, retentionDeadline int64, rowsMerged, rowsDeleted *uint64) error {
// Search for the first block to merge // Search for the first block to merge
var pendingBlock *Block var pendingBlock *Block
for bsm.NextBlock() { for bsm.NextBlock() {
@ -53,6 +53,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) *rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue continue
} }
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// Skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
pendingBlock = getBlock() pendingBlock = getBlock()
pendingBlock.CopyFrom(bsm.Block) pendingBlock.CopyFrom(bsm.Block)
break break
@ -75,6 +80,11 @@ func mergeBlockStreamsInternal(ph *partHeader, bsw *blockStreamWriter, bsm *bloc
*rowsDeleted += uint64(bsm.Block.bh.RowsCount) *rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue continue
} }
if bsm.Block.bh.MaxTimestamp < retentionDeadline {
// skip blocks out of the given retention.
*rowsDeleted += uint64(bsm.Block.bh.RowsCount)
continue
}
// Verify whether pendingBlock may be merged with bsm.Block (the current block). // Verify whether pendingBlock may be merged with bsm.Block (the current block).
if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID { if pendingBlock.bh.TSID.MetricID != bsm.Block.bh.TSID.MetricID {

View file

@ -365,7 +365,7 @@ func TestMergeForciblyStop(t *testing.T) {
ch := make(chan struct{}) ch := make(chan struct{})
var rowsMerged, rowsDeleted uint64 var rowsMerged, rowsDeleted uint64
close(ch) close(ch)
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) { if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, 0, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped) t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
} }
if rowsMerged != 0 { if rowsMerged != 0 {
@ -385,7 +385,7 @@ func testMergeBlockStreams(t *testing.T, bsrs []*blockStreamReader, expectedBloc
bsw.InitFromInmemoryPart(&mp) bsw.InitFromInmemoryPart(&mp)
var rowsMerged, rowsDeleted uint64 var rowsMerged, rowsDeleted uint64
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
t.Fatalf("unexpected error in mergeBlockStreams: %s", err) t.Fatalf("unexpected error in mergeBlockStreams: %s", err)
} }

View file

@ -41,7 +41,7 @@ func benchmarkMergeBlockStreams(b *testing.B, mps []*inmemoryPart, rowsPerLoop i
} }
mpOut.Reset() mpOut.Reset()
bsw.InitFromInmemoryPart(&mpOut) bsw.InitFromInmemoryPart(&mpOut)
if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, &rowsMerged, &rowsDeleted); err != nil { if err := mergeBlockStreams(&mpOut.ph, &bsw, bsrs, nil, nil, 0, &rowsMerged, &rowsDeleted); err != nil {
panic(fmt.Errorf("cannot merge block streams: %w", err)) panic(fmt.Errorf("cannot merge block streams: %w", err))
} }
} }

View file

@ -134,6 +134,10 @@ type partition struct {
// The callack that returns deleted metric ids which must be skipped during merge. // The callack that returns deleted metric ids which must be skipped during merge.
getDeletedMetricIDs func() *uint64set.Set getDeletedMetricIDs func() *uint64set.Set
// data retention in milliseconds.
// Used for deleting data outside the retention during background merge.
retentionMsecs int64
// Name is the name of the partition in the form YYYY_MM. // Name is the name of the partition in the form YYYY_MM.
name string name string
@ -206,7 +210,7 @@ func (pw *partWrapper) decRef() {
// createPartition creates new partition for the given timestamp and the given paths // createPartition creates new partition for the given timestamp and the given paths
// to small and big partitions. // to small and big partitions.
func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
name := timestampToPartitionName(timestamp) name := timestampToPartitionName(timestamp)
smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name smallPartsPath := filepath.Clean(smallPartitionsPath) + "/" + name
bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name bigPartsPath := filepath.Clean(bigPartitionsPath) + "/" + name
@ -219,7 +223,7 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err) return nil, fmt.Errorf("cannot create directories for big parts %q: %w", bigPartsPath, err)
} }
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
pt.tr.fromPartitionTimestamp(timestamp) pt.tr.fromPartitionTimestamp(timestamp)
pt.startMergeWorkers() pt.startMergeWorkers()
pt.startRawRowsFlusher() pt.startRawRowsFlusher()
@ -241,7 +245,7 @@ func (pt *partition) Drop() {
} }
// openPartition opens the existing partition from the given paths. // openPartition opens the existing partition from the given paths.
func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) (*partition, error) { func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*partition, error) {
smallPartsPath = filepath.Clean(smallPartsPath) smallPartsPath = filepath.Clean(smallPartsPath)
bigPartsPath = filepath.Clean(bigPartsPath) bigPartsPath = filepath.Clean(bigPartsPath)
@ -265,7 +269,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err) return nil, fmt.Errorf("cannot open big parts from %q: %w", bigPartsPath, err)
} }
pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs) pt := newPartition(name, smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
pt.smallParts = smallParts pt.smallParts = smallParts
pt.bigParts = bigParts pt.bigParts = bigParts
if err := pt.tr.fromPartitionName(name); err != nil { if err := pt.tr.fromPartitionName(name); err != nil {
@ -278,13 +282,14 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func
return pt, nil return pt, nil
} }
func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set) *partition { func newPartition(name, smallPartsPath, bigPartsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) *partition {
p := &partition{ p := &partition{
name: name, name: name,
smallPartsPath: smallPartsPath, smallPartsPath: smallPartsPath,
bigPartsPath: bigPartsPath, bigPartsPath: bigPartsPath,
getDeletedMetricIDs: getDeletedMetricIDs, getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs,
mergeIdx: uint64(time.Now().UnixNano()), mergeIdx: uint64(time.Now().UnixNano()),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
@ -1129,7 +1134,8 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
atomic.AddUint64(&pt.smallMergesCount, 1) atomic.AddUint64(&pt.smallMergesCount, 1)
atomic.AddUint64(&pt.activeSmallMerges, 1) atomic.AddUint64(&pt.activeSmallMerges, 1)
} }
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, rowsMerged, rowsDeleted) retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs
err := mergeBlockStreams(&ph, bsw, bsrs, stopCh, dmis, retentionDeadline, rowsMerged, rowsDeleted)
if isBigPart { if isBigPart {
atomic.AddUint64(&pt.activeBigMerges, ^uint64(0)) atomic.AddUint64(&pt.activeBigMerges, ^uint64(0))
} else { } else {

View file

@ -167,7 +167,8 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
}) })
// Create partition from rowss and test search on it. // Create partition from rowss and test search on it.
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs) retentionMsecs := timestampFromTime(time.Now()) - ptr.MinTimestamp + 3600*1000
pt, err := createPartition(ptt, "./small-table", "./big-table", nilGetDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot create partition: %s", err) t.Fatalf("cannot create partition: %s", err)
} }
@ -191,7 +192,7 @@ func testPartitionSearchEx(t *testing.T, ptt int64, tr TimeRange, partsCount, ma
pt.MustClose() pt.MustClose()
// Open the created partition and test search on it. // Open the created partition and test search on it.
pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs) pt, err = openPartition(smallPartsPath, bigPartsPath, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot open partition: %s", err) t.Fatalf("cannot open partition: %s", err)
} }

View file

@ -27,7 +27,10 @@ import (
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
) )
const maxRetentionMonths = 12 * 100 const (
msecsPerMonth = 31 * 24 * 3600 * 1000
maxRetentionMsecs = 100 * 12 * msecsPerMonth
)
// Storage represents TSDB storage. // Storage represents TSDB storage.
type Storage struct { type Storage struct {
@ -117,23 +120,20 @@ type accountProjectKey struct {
ProjectID uint32 ProjectID uint32
} }
// OpenStorage opens storage on the given path with the given number of retention months. // OpenStorage opens storage on the given path with the given retentionMsecs.
func OpenStorage(path string, retentionMonths int) (*Storage, error) { func OpenStorage(path string, retentionMsecs int64) (*Storage, error) {
if retentionMonths > maxRetentionMonths {
return nil, fmt.Errorf("too big retentionMonths=%d; cannot exceed %d", retentionMonths, maxRetentionMonths)
}
if retentionMonths <= 0 {
retentionMonths = maxRetentionMonths
}
path, err := filepath.Abs(path) path, err := filepath.Abs(path)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err) return nil, fmt.Errorf("cannot determine absolute path for %q: %w", path, err)
} }
if retentionMsecs <= 0 {
retentionMsecs = maxRetentionMsecs
}
retentionMonths := (retentionMsecs + (msecsPerMonth - 1)) / msecsPerMonth
s := &Storage{ s := &Storage{
path: path, path: path,
cachePath: path + "/cache", cachePath: path + "/cache",
retentionMonths: retentionMonths, retentionMonths: int(retentionMonths),
stop: make(chan struct{}), stop: make(chan struct{}),
} }
@ -188,7 +188,7 @@ func OpenStorage(path string, retentionMonths int) (*Storage, error) {
// Load data // Load data
tablePath := path + "/data" tablePath := path + "/data"
tb, err := openTable(tablePath, retentionMonths, s.getDeletedMetricIDs) tb, err := openTable(tablePath, s.getDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
s.idb().MustClose() s.idb().MustClose()
return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err) return nil, fmt.Errorf("cannot open table at %q: %w", tablePath, err)

View file

@ -389,8 +389,8 @@ func TestStorageOpenMultipleTimes(t *testing.T) {
func TestStorageRandTimestamps(t *testing.T) { func TestStorageRandTimestamps(t *testing.T) {
path := "TestStorageRandTimestamps" path := "TestStorageRandTimestamps"
retentionMonths := 60 retentionMsecs := int64(60 * msecsPerMonth)
s, err := OpenStorage(path, retentionMonths) s, err := OpenStorage(path, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot open storage: %s", err) t.Fatalf("cannot open storage: %s", err)
} }
@ -400,7 +400,7 @@ func TestStorageRandTimestamps(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
s.MustClose() s.MustClose()
s, err = OpenStorage(path, retentionMonths) s, err = OpenStorage(path, retentionMsecs)
} }
}) })
t.Run("concurrent", func(t *testing.T) { t.Run("concurrent", func(t *testing.T) {

View file

@ -22,6 +22,7 @@ type table struct {
bigPartitionsPath string bigPartitionsPath string
getDeletedMetricIDs func() *uint64set.Set getDeletedMetricIDs func() *uint64set.Set
retentionMsecs int64
ptws []*partitionWrapper ptws []*partitionWrapper
ptwsLock sync.Mutex ptwsLock sync.Mutex
@ -30,8 +31,7 @@ type table struct {
stop chan struct{} stop chan struct{}
retentionMilliseconds int64 retentionWatcherWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
} }
// partitionWrapper provides refcounting mechanism for the partition. // partitionWrapper provides refcounting mechanism for the partition.
@ -77,12 +77,12 @@ func (ptw *partitionWrapper) scheduleToDrop() {
atomic.AddUint64(&ptw.mustDrop, 1) atomic.AddUint64(&ptw.mustDrop, 1)
} }
// openTable opens a table on the given path with the given retentionMonths. // openTable opens a table on the given path with the given retentionMsecs.
// //
// The table is created if it doesn't exist. // The table is created if it doesn't exist.
// //
// Data older than the retentionMonths may be dropped at any time. // Data older than the retentionMsecs may be dropped at any time.
func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uint64set.Set) (*table, error) { func openTable(path string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) (*table, error) {
path = filepath.Clean(path) path = filepath.Clean(path)
// Create a directory for the table if it doesn't exist yet. // Create a directory for the table if it doesn't exist yet.
@ -115,7 +115,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
} }
// Open partitions. // Open partitions.
pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs) pts, err := openPartitions(smallPartitionsPath, bigPartitionsPath, getDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err) return nil, fmt.Errorf("cannot open partitions in the table %q: %w", path, err)
} }
@ -125,6 +125,7 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
smallPartitionsPath: smallPartitionsPath, smallPartitionsPath: smallPartitionsPath,
bigPartitionsPath: bigPartitionsPath, bigPartitionsPath: bigPartitionsPath,
getDeletedMetricIDs: getDeletedMetricIDs, getDeletedMetricIDs: getDeletedMetricIDs,
retentionMsecs: retentionMsecs,
flockF: flockF, flockF: flockF,
@ -133,11 +134,6 @@ func openTable(path string, retentionMonths int, getDeletedMetricIDs func() *uin
for _, pt := range pts { for _, pt := range pts {
tb.addPartitionNolock(pt) tb.addPartitionNolock(pt)
} }
if retentionMonths <= 0 || retentionMonths > maxRetentionMonths {
retentionMonths = maxRetentionMonths
}
tb.retentionMilliseconds = int64(retentionMonths) * 31 * 24 * 3600 * 1e3
tb.startRetentionWatcher() tb.startRetentionWatcher()
return tb, nil return tb, nil
} }
@ -357,7 +353,7 @@ func (tb *table) AddRows(rows []rawRow) error {
continue continue
} }
pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs) pt, err := createPartition(r.Timestamp, tb.smallPartitionsPath, tb.bigPartitionsPath, tb.getDeletedMetricIDs, tb.retentionMsecs)
if err != nil { if err != nil {
errors = append(errors, err) errors = append(errors, err)
continue continue
@ -376,7 +372,7 @@ func (tb *table) AddRows(rows []rawRow) error {
func (tb *table) getMinMaxTimestamps() (int64, int64) { func (tb *table) getMinMaxTimestamps() (int64, int64) {
now := int64(fasttime.UnixTimestamp() * 1000) now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMilliseconds minTimestamp := now - tb.retentionMsecs
maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :) maxTimestamp := now + 2*24*3600*1000 // allow max +2 days from now due to timezones shit :)
if minTimestamp < 0 { if minTimestamp < 0 {
// Negative timestamps aren't supported by the storage. // Negative timestamps aren't supported by the storage.
@ -406,7 +402,7 @@ func (tb *table) retentionWatcher() {
case <-ticker.C: case <-ticker.C:
} }
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMilliseconds minTimestamp := int64(fasttime.UnixTimestamp()*1000) - tb.retentionMsecs
var ptwsDrop []*partitionWrapper var ptwsDrop []*partitionWrapper
tb.ptwsLock.Lock() tb.ptwsLock.Lock()
dst := tb.ptws[:0] dst := tb.ptws[:0]
@ -457,7 +453,7 @@ func (tb *table) PutPartitions(ptws []*partitionWrapper) {
} }
} }
func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set) ([]*partition, error) { func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMetricIDs func() *uint64set.Set, retentionMsecs int64) ([]*partition, error) {
// Certain partition directories in either `big` or `small` dir may be missing // Certain partition directories in either `big` or `small` dir may be missing
// after restoring from backup. So populate partition names from both dirs. // after restoring from backup. So populate partition names from both dirs.
ptNames := make(map[string]bool) ptNames := make(map[string]bool)
@ -471,7 +467,7 @@ func openPartitions(smallPartitionsPath, bigPartitionsPath string, getDeletedMet
for ptName := range ptNames { for ptName := range ptNames {
smallPartsPath := smallPartitionsPath + "/" + ptName smallPartsPath := smallPartitionsPath + "/" + ptName
bigPartsPath := bigPartitionsPath + "/" + ptName bigPartsPath := bigPartitionsPath + "/" + ptName
pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs) pt, err := openPartition(smallPartsPath, bigPartsPath, getDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
mustClosePartitions(pts) mustClosePartitions(pts)
return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err) return nil, fmt.Errorf("cannot open partition %q: %w", ptName, err)

View file

@ -66,7 +66,7 @@ func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange) {
// Adjust tr.MinTimestamp, so it doesn't obtain data older // Adjust tr.MinTimestamp, so it doesn't obtain data older
// than the tb retention. // than the tb retention.
now := int64(fasttime.UnixTimestamp() * 1000) now := int64(fasttime.UnixTimestamp() * 1000)
minTimestamp := now - tb.retentionMilliseconds minTimestamp := now - tb.retentionMsecs
if tr.MinTimestamp < minTimestamp { if tr.MinTimestamp < minTimestamp {
tr.MinTimestamp = minTimestamp tr.MinTimestamp = minTimestamp
} }

View file

@ -181,7 +181,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
}) })
// Create a table from rowss and test search on it. // Create a table from rowss and test search on it.
tb, err := openTable("./test-table", -1, nilGetDeletedMetricIDs) tb, err := openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot create table: %s", err) t.Fatalf("cannot create table: %s", err)
} }
@ -202,7 +202,7 @@ func testTableSearchEx(t *testing.T, trData, trSearch TimeRange, partitionsCount
tb.MustClose() tb.MustClose()
// Open the created table and test search on it. // Open the created table and test search on it.
tb, err = openTable("./test-table", -1, nilGetDeletedMetricIDs) tb, err = openTable("./test-table", nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot open table: %s", err) t.Fatalf("cannot open table: %s", err)
} }

View file

@ -47,7 +47,7 @@ func openBenchTable(b *testing.B, startTimestamp int64, rowsPerInsert, rowsCount
createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) createBenchTable(b, path, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
createdBenchTables[path] = true createdBenchTables[path] = true
} }
tb, err := openTable(path, -1, nilGetDeletedMetricIDs) tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
b.Fatalf("cnanot open table %q: %s", path, err) b.Fatalf("cnanot open table %q: %s", path, err)
} }
@ -70,7 +70,7 @@ var createdBenchTables = make(map[string]bool)
func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) { func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerInsert, rowsCount, tsidsCount int) {
b.Helper() b.Helper()
tb, err := openTable(path, -1, nilGetDeletedMetricIDs) tb, err := openTable(path, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", path, err) b.Fatalf("cannot open table %q: %s", path, err)
} }

View file

@ -7,7 +7,7 @@ import (
func TestTableOpenClose(t *testing.T) { func TestTableOpenClose(t *testing.T) {
const path = "TestTableOpenClose" const path = "TestTableOpenClose"
const retentionMonths = 123 const retentionMsecs = 123 * msecsPerMonth
if err := os.RemoveAll(path); err != nil { if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err) t.Fatalf("cannot remove %q: %s", path, err)
@ -17,7 +17,7 @@ func TestTableOpenClose(t *testing.T) {
}() }()
// Create a new table // Create a new table
tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot create new table: %s", err) t.Fatalf("cannot create new table: %s", err)
} }
@ -27,7 +27,7 @@ func TestTableOpenClose(t *testing.T) {
// Re-open created table multiple times. // Re-open created table multiple times.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) tb, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot open created table: %s", err) t.Fatalf("cannot open created table: %s", err)
} }
@ -37,20 +37,20 @@ func TestTableOpenClose(t *testing.T) {
func TestTableOpenMultipleTimes(t *testing.T) { func TestTableOpenMultipleTimes(t *testing.T) {
const path = "TestTableOpenMultipleTimes" const path = "TestTableOpenMultipleTimes"
const retentionMonths = 123 const retentionMsecs = 123 * msecsPerMonth
defer func() { defer func() {
_ = os.RemoveAll(path) _ = os.RemoveAll(path)
}() }()
tb1, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) tb1, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err != nil { if err != nil {
t.Fatalf("cannot open table the first time: %s", err) t.Fatalf("cannot open table the first time: %s", err)
} }
defer tb1.MustClose() defer tb1.MustClose()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
tb2, err := openTable(path, retentionMonths, nilGetDeletedMetricIDs) tb2, err := openTable(path, nilGetDeletedMetricIDs, retentionMsecs)
if err == nil { if err == nil {
tb2.MustClose() tb2.MustClose()
t.Fatalf("expecting non-nil error when opening already opened table") t.Fatalf("expecting non-nil error when opening already opened table")

View file

@ -45,7 +45,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
b.SetBytes(int64(rowsCountExpected)) b.SetBytes(int64(rowsCountExpected))
tablePath := "./benchmarkTableAddRows" tablePath := "./benchmarkTableAddRows"
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
tb, err := openTable(tablePath, -1, nilGetDeletedMetricIDs) tb, err := openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err) b.Fatalf("cannot open table %q: %s", tablePath, err)
} }
@ -93,7 +93,7 @@ func benchmarkTableAddRows(b *testing.B, rowsPerInsert, tsidsCount int) {
tb.MustClose() tb.MustClose()
// Open the table from files and verify the rows count on it // Open the table from files and verify the rows count on it
tb, err = openTable(tablePath, -1, nilGetDeletedMetricIDs) tb, err = openTable(tablePath, nilGetDeletedMetricIDs, maxRetentionMsecs)
if err != nil { if err != nil {
b.Fatalf("cannot open table %q: %s", tablePath, err) b.Fatalf("cannot open table %q: %s", tablePath, err)
} }