mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: pre-create timeseries before indexDB rotation (#4652)
* lib/storage: pre-create timeseries before indexDB rotation during an hour before indexDB rotation start creating records at the next indexDB it must improve performance during switch for the next indexDB and remove ingestion issues. Since there is no need for creation new index records for timeseries already ingested into current indexDB https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 * lib/storage: further work on indexdb rotation optimization - Document the change at docs/CHAGNELOG.md - Move back various caches from indexDB to Storage. This makes the change less intrusive. The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID) entries for both the current and the next indexDB. - Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function. This improves code readability and maintainability a bit. - Rewrite and simplify the code responsible for calculating the next retention timestamp. Add various tests for corner cases of this code. - Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called. It is OK to add indexdb entries on demand in this function. This simplifies the code. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401 * docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563 --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
44cfcfe62e
commit
544fba6826
6 changed files with 368 additions and 193 deletions
|
@ -662,6 +662,9 @@ func registerStorageMetrics(strg *storage.Storage) {
|
|||
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
|
||||
return float64(m().TimeseriesRepopulated)
|
||||
})
|
||||
metrics.NewGauge(`vm_timeseries_precreated_total`, func() float64 {
|
||||
return float64(m().TimeseriesPreCreated)
|
||||
})
|
||||
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
|
||||
return float64(m().NewTimeseriesCreated)
|
||||
})
|
||||
|
|
|
@ -27,7 +27,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
|||
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
|
||||
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
|
||||
|
||||
* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) for details.
|
||||
* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [the description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563) for details.
|
||||
* FEATURE: reduce spikes in CPU and disk IO usage during `indexdb` rotation (aka inverted index), which is performed once per [`-retentionPeriod`](https://docs.victoriametrics.com/#retention). The new algorithm gradually pre-populates newly created `indexdb` during the last hour before the rotation. The number of pre-populated series in the newly created `indexdb` can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_timeseries_precreated_total` metric. This should resolve [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow selecting time series matching at least one of multiple `or` filters. For example, `{env="prod",job="a" or env="dev",job="b"}` selects series with either `{env="prod",job="a"}` or `{env="dev",job="b"}` labels. This functionality allows passing the selected series to [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) without the need to use [subqueries](https://docs.victoriametrics.com/MetricsQL.html#subqueries). See [these docs](https://docs.victoriametrics.com/keyConcepts.html#filtering-by-multiple-or-filters).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to preserve metric names for binary operation results via `keep_metric_names` modifier. For example, `({__name__=~"foo|bar"} / 10) keep_metric_names` leaves `foo` and `bar` metric names in division results. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#keep_metric_names). This helps to address issues like [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3710).
|
||||
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to copy all the labels from `one` side of [many-to-one operations](https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches) by specifying `*` inside `group_left()` or `group_right()`. Also allow adding a prefix for copied label names via `group_left(*) prefix "..."` syntax. For example, the following query copies Kubernetes namespace labels to `kube_pod_info` series and adds `ns_` prefix for the copied label names: `kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels`. The labels from `on()` list aren't prefixed. This feature resolves [this](https://stackoverflow.com/questions/76661818/how-to-add-namespace-labels-to-pod-labels-in-prometheus) and [that](https://stackoverflow.com/questions/76653997/how-can-i-make-a-new-copy-of-kube-namespace-labels-metric-with-a-different-name) questions at StackOverflow.
|
||||
|
|
|
@ -96,9 +96,6 @@ type indexDB struct {
|
|||
// and is used for syncing items from different indexDBs
|
||||
generation uint64
|
||||
|
||||
// The unix timestamp in seconds for the indexDB rotation.
|
||||
rotationTimestamp uint64
|
||||
|
||||
name string
|
||||
tb *mergeset.Table
|
||||
|
||||
|
@ -136,10 +133,7 @@ func getTagFiltersCacheSize() int {
|
|||
//
|
||||
// The last segment of the path should contain unique hex value which
|
||||
// will be then used as indexDB.generation
|
||||
//
|
||||
// The rotationTimestamp must be set to the current unix timestamp when mustOpenIndexDB
|
||||
// is called when creating new indexdb during indexdb rotation.
|
||||
func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) *indexDB {
|
||||
func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
|
||||
if s == nil {
|
||||
logger.Panicf("BUG: Storage must be nin-nil")
|
||||
}
|
||||
|
@ -157,11 +151,10 @@ func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOn
|
|||
tagFiltersCacheSize := getTagFiltersCacheSize()
|
||||
|
||||
db := &indexDB{
|
||||
refCount: 1,
|
||||
generation: gen,
|
||||
rotationTimestamp: rotationTimestamp,
|
||||
tb: tb,
|
||||
name: name,
|
||||
refCount: 1,
|
||||
generation: gen,
|
||||
tb: tb,
|
||||
name: name,
|
||||
|
||||
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
|
||||
s: s,
|
||||
|
|
|
@ -495,7 +495,7 @@ func TestIndexDBOpenClose(t *testing.T) {
|
|||
tableName := nextIndexDBTableName()
|
||||
for i := 0; i < 5; i++ {
|
||||
var isReadOnly uint32
|
||||
db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly)
|
||||
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
|
||||
db.MustClose()
|
||||
}
|
||||
if err := os.RemoveAll(tableName); err != nil {
|
||||
|
|
|
@ -45,6 +45,7 @@ type Storage struct {
|
|||
tooBigTimestampRows uint64
|
||||
|
||||
timeseriesRepopulated uint64
|
||||
timeseriesPreCreated uint64
|
||||
newTimeseriesCreated uint64
|
||||
slowRowInserts uint64
|
||||
slowPerDayIndexInserts uint64
|
||||
|
@ -53,6 +54,13 @@ type Storage struct {
|
|||
hourlySeriesLimitRowsDropped uint64
|
||||
dailySeriesLimitRowsDropped uint64
|
||||
|
||||
// nextRotationTimestamp is a timestamp in seconds of the next indexdb rotation.
|
||||
//
|
||||
// It is used for gradual pre-population of the idbNext during the last hour before the indexdb rotation.
|
||||
// in order to reduce spikes in CPU and disk IO usage just after the rotiation.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||
nextRotationTimestamp int64
|
||||
|
||||
path string
|
||||
cachePath string
|
||||
retentionMsecs int64
|
||||
|
@ -60,8 +68,17 @@ type Storage struct {
|
|||
// lock file for exclusive access to the storage on the given path.
|
||||
flockF *os.File
|
||||
|
||||
// idbCurr contains the currently used indexdb.
|
||||
idbCurr atomic.Pointer[indexDB]
|
||||
|
||||
// idbNext is the next indexdb, which will become idbCurr at the next rotation.
|
||||
//
|
||||
// It is started to be gradually pre-populated with the data for active time series during the last hour
|
||||
// before nextRotationTimestamp.
|
||||
// This reduces spikes in CPU and disk IO usage just after the rotiation.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||
idbNext atomic.Pointer[indexDB]
|
||||
|
||||
tb *table
|
||||
|
||||
// Series cardinality limiters.
|
||||
|
@ -77,7 +94,8 @@ type Storage struct {
|
|||
// metricNameCache is MetricID -> MetricName cache.
|
||||
metricNameCache *workingsetcache.Cache
|
||||
|
||||
// dateMetricIDCache is (Date, MetricID) cache.
|
||||
// dateMetricIDCache is (generation, Date, MetricID) cache, where generation is the indexdb generation.
|
||||
// See generationTSID for details.
|
||||
dateMetricIDCache *dateMetricIDCache
|
||||
|
||||
// Fast cache for MetricID values occurred during the current hour.
|
||||
|
@ -203,9 +221,6 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail
|
|||
s.prevHourMetricIDs.Store(hmPrev)
|
||||
s.pendingHourEntries = &uint64set.Set{}
|
||||
|
||||
date := fasttime.UnixDate()
|
||||
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
|
||||
s.nextDayMetricIDs.Store(nextDayMetricIDs)
|
||||
s.pendingNextDayMetricIDs = &uint64set.Set{}
|
||||
|
||||
s.prefetchedMetricIDs.Store(&uint64set.Set{})
|
||||
|
@ -221,9 +236,23 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail
|
|||
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
|
||||
fs.MustMkdirIfNotExist(idbSnapshotsPath)
|
||||
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
|
||||
idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
|
||||
idbNext, idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
|
||||
|
||||
idbCurr.SetExtDB(idbPrev)
|
||||
idbNext.SetExtDB(idbCurr)
|
||||
|
||||
s.idbCurr.Store(idbCurr)
|
||||
s.idbNext.Store(idbNext)
|
||||
|
||||
// Initialize nextRotationTimestamp
|
||||
nowSecs := time.Now().UnixNano() / 1e9
|
||||
nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionMsecs/1000, retentionTimezoneOffsetSecs)
|
||||
atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp)
|
||||
|
||||
// Load nextDayMetricIDs cache
|
||||
date := fasttime.UnixDate()
|
||||
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(idbCurr.generation, date)
|
||||
s.nextDayMetricIDs.Store(nextDayMetricIDs)
|
||||
|
||||
// Load deleted metricIDs from idbCurr and idbPrev
|
||||
dmisCurr, err := idbCurr.loadDeletedMetricIDs()
|
||||
|
@ -446,6 +475,7 @@ type Metrics struct {
|
|||
TooBigTimestampRows uint64
|
||||
|
||||
TimeseriesRepopulated uint64
|
||||
TimeseriesPreCreated uint64
|
||||
NewTimeseriesCreated uint64
|
||||
SlowRowInserts uint64
|
||||
SlowPerDayIndexInserts uint64
|
||||
|
@ -517,6 +547,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
|
||||
|
||||
m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated)
|
||||
m.TimeseriesPreCreated += atomic.LoadUint64(&s.timeseriesPreCreated)
|
||||
m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated)
|
||||
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
|
||||
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
|
||||
|
@ -587,12 +618,20 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
|
|||
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
|
||||
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
|
||||
|
||||
m.NextRetentionSeconds = uint64(nextRetentionDuration(s.retentionMsecs).Seconds())
|
||||
d := s.nextRetentionSeconds()
|
||||
if d < 0 {
|
||||
d = 0
|
||||
}
|
||||
m.NextRetentionSeconds = uint64(d)
|
||||
|
||||
s.idb().UpdateMetrics(&m.IndexDBMetrics)
|
||||
s.tb.UpdateMetrics(&m.TableMetrics)
|
||||
}
|
||||
|
||||
func (s *Storage) nextRetentionSeconds() int64 {
|
||||
return atomic.LoadInt64(&s.nextRotationTimestamp) - int64(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
|
||||
//
|
||||
// The function must be called before opening or creating any storage.
|
||||
|
@ -649,11 +688,11 @@ func (s *Storage) startRetentionWatcher() {
|
|||
|
||||
func (s *Storage) retentionWatcher() {
|
||||
for {
|
||||
d := nextRetentionDuration(s.retentionMsecs)
|
||||
d := s.nextRetentionSeconds()
|
||||
select {
|
||||
case <-s.stop:
|
||||
return
|
||||
case <-time.After(d):
|
||||
case <-time.After(time.Second * time.Duration(d)):
|
||||
s.mustRotateIndexDB()
|
||||
}
|
||||
}
|
||||
|
@ -712,23 +751,29 @@ func (s *Storage) nextDayMetricIDsUpdater() {
|
|||
}
|
||||
|
||||
func (s *Storage) mustRotateIndexDB() {
|
||||
// Create new indexdb table.
|
||||
// Create new indexdb table, which will be used as idbNext
|
||||
newTableName := nextIndexDBTableName()
|
||||
idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName)
|
||||
rotationTimestamp := fasttime.UnixTimestamp()
|
||||
idbNew := mustOpenIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
|
||||
idbNew := mustOpenIndexDB(idbNewPath, s, &s.isReadOnly)
|
||||
|
||||
// Drop extDB
|
||||
// Update nextRotationTimestamp
|
||||
atomic.AddInt64(&s.nextRotationTimestamp, s.retentionMsecs/1000)
|
||||
|
||||
// Set idbNext to idbNew
|
||||
idbNext := s.idbNext.Load()
|
||||
idbNew.SetExtDB(idbNext)
|
||||
s.idbNext.Store(idbNew)
|
||||
|
||||
// Set idbCurr to idbNext
|
||||
idbCurr := s.idb()
|
||||
s.idbCurr.Store(idbNext)
|
||||
|
||||
// Schedule data removal for idbPrev
|
||||
idbCurr.doExtDB(func(extDB *indexDB) {
|
||||
extDB.scheduleToDrop()
|
||||
})
|
||||
idbCurr.SetExtDB(nil)
|
||||
|
||||
// Start using idbNew
|
||||
idbNew.SetExtDB(idbCurr)
|
||||
s.idbCurr.Store(idbNew)
|
||||
|
||||
// Persist changes on the file system.
|
||||
fs.MustSyncPath(s.path)
|
||||
|
||||
|
@ -738,7 +783,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
|
||||
|
||||
// Flush metric id caches for the current and the previous hour,
|
||||
// since they may contain entries missing in idbNew.
|
||||
// since they may contain entries missing in idbCurr after the rotation.
|
||||
// This should prevent from missing data in queries when
|
||||
// the following steps are performed for short -retentionPeriod (e.g. 1 day):
|
||||
//
|
||||
|
@ -748,7 +793,7 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
// These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb.
|
||||
// 4. Stop adding new samples for these series just before 5 UTC.
|
||||
// 5. The next indexdb rotation is performed at 4 UTC next day.
|
||||
// The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
|
||||
// The information about the series added at step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
|
||||
// while the current indexdb doesn't contain information about the series.
|
||||
// So queries for the last 24 hours stop returning samples added at step 3.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
|
||||
|
@ -758,13 +803,12 @@ func (s *Storage) mustRotateIndexDB() {
|
|||
s.currHourMetricIDs.Store(&hourMetricIDs{})
|
||||
s.prevHourMetricIDs.Store(&hourMetricIDs{})
|
||||
|
||||
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
|
||||
s.dateMetricIDCache.Reset()
|
||||
// Do not flush dateMetricIDCache, since it contains entries prefixed with idb generation.
|
||||
|
||||
// There is no need in resetting nextDayMetricIDs, since it contains entries prefixed with idb generation.
|
||||
|
||||
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
|
||||
// from prev idb remain valid after the rotation.
|
||||
|
||||
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
|
||||
}
|
||||
|
||||
func (s *Storage) resetAndSaveTSIDCache() {
|
||||
|
@ -818,11 +862,14 @@ func (s *Storage) MustClose() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
|
||||
func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *byDateMetricIDEntry {
|
||||
e := &byDateMetricIDEntry{
|
||||
date: date,
|
||||
k: generationDateKey{
|
||||
generation: generation,
|
||||
date: date,
|
||||
},
|
||||
}
|
||||
name := "next_day_metric_ids"
|
||||
name := "next_day_metric_ids_v2"
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
if !fs.IsPathExist(path) {
|
||||
return e
|
||||
|
@ -831,12 +878,17 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
|
|||
if err != nil {
|
||||
logger.Panicf("FATAL: cannot read %s: %s", path, err)
|
||||
}
|
||||
if len(src) < 16 {
|
||||
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16)
|
||||
if len(src) < 24 {
|
||||
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
|
||||
return e
|
||||
}
|
||||
|
||||
// Unmarshal header
|
||||
generationLoaded := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
if generationLoaded != generation {
|
||||
logger.Infof("discarding %s, since it contains data for stale generation; got %d; want %d", path, generationLoaded, generation)
|
||||
}
|
||||
dateLoaded := encoding.UnmarshalUint64(src)
|
||||
src = src[8:]
|
||||
if dateLoaded != date {
|
||||
|
@ -898,12 +950,13 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
|
|||
}
|
||||
|
||||
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
|
||||
name := "next_day_metric_ids"
|
||||
name := "next_day_metric_ids_v2"
|
||||
path := filepath.Join(s.cachePath, name)
|
||||
dst := make([]byte, 0, e.v.Len()*8+16)
|
||||
|
||||
// Marshal header
|
||||
dst = encoding.MarshalUint64(dst, e.date)
|
||||
dst = encoding.MarshalUint64(dst, e.k.generation)
|
||||
dst = encoding.MarshalUint64(dst, e.k.date)
|
||||
|
||||
// Marshal e.v
|
||||
dst = marshalUint64Set(dst, &e.v)
|
||||
|
@ -1009,30 +1062,31 @@ var saveCacheLock sync.Mutex
|
|||
// SetRetentionTimezoneOffset sets the offset, which is used for calculating the time for indexdb rotation.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574
|
||||
func SetRetentionTimezoneOffset(offset time.Duration) {
|
||||
retentionTimezoneOffsetMsecs = offset.Milliseconds()
|
||||
retentionTimezoneOffsetSecs = int64(offset.Seconds())
|
||||
}
|
||||
|
||||
var retentionTimezoneOffsetMsecs int64
|
||||
var retentionTimezoneOffsetSecs int64
|
||||
|
||||
func nextRetentionDuration(retentionMsecs int64) time.Duration {
|
||||
nowMsecs := time.Now().UnixNano() / 1e6
|
||||
return nextRetentionDurationAt(nowMsecs, retentionMsecs)
|
||||
}
|
||||
func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64 {
|
||||
// Round retentionSecs to days. This guarantees that per-day inverted index works as expected
|
||||
const secsPerDay = 24 * 3600
|
||||
retentionSecs = ((retentionSecs + secsPerDay - 1) / secsPerDay) * secsPerDay
|
||||
|
||||
func nextRetentionDurationAt(atMsecs int64, retentionMsecs int64) time.Duration {
|
||||
// Round retentionMsecs to days. This guarantees that per-day inverted index works as expected
|
||||
retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay
|
||||
// Schedule the deadline to +4 hours from the next retention period start
|
||||
// because of historical reasons - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248
|
||||
offsetSecs -= 4 * 3600
|
||||
|
||||
// The effect of time zone on retention period is moved out.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574
|
||||
deadline := ((atMsecs + retentionMsecs + retentionTimezoneOffsetMsecs - 1) / retentionMsecs) * retentionMsecs
|
||||
// Make sure that offsetSecs doesn't exceed retentionSecs
|
||||
offsetSecs %= retentionSecs
|
||||
|
||||
// Schedule the deadline to +4 hours from the next retention period start.
|
||||
// This should prevent from possible double deletion of indexdb
|
||||
// due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 .
|
||||
deadline += int64(4 * 3600 * 1000)
|
||||
deadline -= retentionTimezoneOffsetMsecs
|
||||
return time.Duration(deadline-atMsecs) * time.Millisecond
|
||||
// align the retention deadline to multiples of retentionSecs
|
||||
// This makes the deadline independent of atSecs.
|
||||
deadline := ((atSecs + offsetSecs + retentionSecs - 1) / retentionSecs) * retentionSecs
|
||||
|
||||
// Apply the provided offsetSecs
|
||||
deadline -= offsetSecs
|
||||
|
||||
return deadline
|
||||
}
|
||||
|
||||
// SearchMetricNames returns marshaled metric names matching the given tfss on the given tr.
|
||||
|
@ -1553,6 +1607,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
var seriesRepopulated uint64
|
||||
|
||||
idb := s.idb()
|
||||
generation := idb.generation
|
||||
is := idb.getIndexSearch(noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
var firstWarn error
|
||||
|
@ -1565,7 +1620,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
// Skip row, since it exceeds cardinality limit
|
||||
continue
|
||||
}
|
||||
if genTSID.generation != idb.generation {
|
||||
if genTSID.generation < generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
|
||||
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
|
||||
|
@ -1580,7 +1635,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
mn.sortTags()
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
}
|
||||
|
@ -1610,10 +1665,10 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
continue
|
||||
}
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
if genTSID.generation < generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
seriesRepopulated++
|
||||
}
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
|
@ -1631,12 +1686,15 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
// Schedule creating TSID indexes instead of creating them synchronously.
|
||||
// This should keep stable the ingestion rate when new time series are ingested.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
}
|
||||
|
||||
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
|
||||
|
||||
// There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called.
|
||||
// So it is OK to register metric names in blocking manner after indexdb rotation.
|
||||
|
||||
if firstWarn != nil {
|
||||
logger.Warnf("cannot create some metrics: %s", firstWarn)
|
||||
}
|
||||
|
@ -1644,6 +1702,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
|
|||
|
||||
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
|
||||
idb := s.idb()
|
||||
generation := idb.generation
|
||||
is := idb.getIndexSearch(noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
|
@ -1727,7 +1786,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
prevTSID = r.TSID
|
||||
prevMetricNameRaw = mr.MetricNameRaw
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
if genTSID.generation < generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
date := uint64(r.Timestamp) / msecPerDay
|
||||
|
||||
|
@ -1741,7 +1800,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
mn.sortTags()
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
seriesRepopulated++
|
||||
slowInsertsCount++
|
||||
|
@ -1775,10 +1834,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
continue
|
||||
}
|
||||
|
||||
if genTSID.generation != idb.generation {
|
||||
if genTSID.generation < generation {
|
||||
// The found TSID is from the previous indexdb. Create it in the current indexdb.
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
seriesRepopulated++
|
||||
}
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
|
@ -1799,7 +1858,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
}
|
||||
|
||||
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
|
||||
genTSID.generation = idb.generation
|
||||
genTSID.generation = generation
|
||||
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
|
||||
newSeriesCount++
|
||||
|
||||
|
@ -1816,11 +1875,17 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
|
|||
atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount)
|
||||
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
|
||||
|
||||
dstMrs = dstMrs[:j]
|
||||
rows = rows[:j]
|
||||
|
||||
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
|
||||
if firstWarn == nil {
|
||||
firstWarn = err
|
||||
}
|
||||
}
|
||||
if firstWarn != nil {
|
||||
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
|
||||
}
|
||||
dstMrs = dstMrs[:j]
|
||||
rows = rows[:j]
|
||||
|
||||
err := s.updatePerDateData(rows, dstMrs)
|
||||
if err != nil {
|
||||
|
@ -1855,9 +1920,9 @@ func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID
|
|||
// so future rows for that TSID are ingested via fast path.
|
||||
s.putTSIDToCache(genTSID, metricNameRaw)
|
||||
|
||||
// Register the (date, metricID) entry in the cache,
|
||||
// Register the (generation, date, metricID) entry in the cache,
|
||||
// so next time the entry is found there instead of searching for it in the indexdb.
|
||||
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
|
||||
s.dateMetricIDCache.Set(genTSID.generation, date, genTSID.TSID.MetricID)
|
||||
}
|
||||
|
||||
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool {
|
||||
|
@ -1896,6 +1961,77 @@ func getUserReadableMetricName(metricNameRaw []byte) string {
|
|||
return mn.String()
|
||||
}
|
||||
|
||||
func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
|
||||
d := s.nextRetentionSeconds()
|
||||
if d >= 3600 {
|
||||
// Fast path: nothing to pre-fill because it is too early.
|
||||
// The pre-fill is started during the last hour before the indexdb rotation.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slower path: less than hour left for the next indexdb rotation.
|
||||
// Pre-populate idbNext with the increasing probability until the rotation.
|
||||
// The probability increases from 0% to 100% proportioinally to d=[3600 .. 0].
|
||||
pMin := float64(d) / 3600
|
||||
|
||||
idbNext := s.idbNext.Load()
|
||||
generation := idbNext.generation
|
||||
isNext := idbNext.getIndexSearch(noDeadline)
|
||||
defer idbNext.putIndexSearch(isNext)
|
||||
|
||||
var firstError error
|
||||
var genTSID generationTSID
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
|
||||
timeseriesPreCreated := uint64(0)
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
p := float64(uint32(fastHashUint64(r.TSID.MetricID))) / (1 << 32)
|
||||
if p < pMin {
|
||||
// Fast path: it is too early to pre-fill indexes for the given MetricID.
|
||||
continue
|
||||
}
|
||||
|
||||
// Check whether the given MetricID is already present in dateMetricIDCache.
|
||||
date := uint64(r.Timestamp) / msecPerDay
|
||||
metricID := r.TSID.MetricID
|
||||
if s.dateMetricIDCache.Has(generation, date, metricID) {
|
||||
// Indexes are already pre-filled.
|
||||
continue
|
||||
}
|
||||
|
||||
// Check whether the given (date, metricID) is already present in idbNext.
|
||||
if isNext.hasDateMetricIDNoExtDB(date, metricID) {
|
||||
// Indexes are already pre-filled at idbNext.
|
||||
//
|
||||
// Register the (generation, date, metricID) entry in the cache,
|
||||
// so next time the entry is found there instead of searching for it in the indexdb.
|
||||
s.dateMetricIDCache.Set(generation, date, metricID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path: pre-fill indexes in idbNext.
|
||||
metricNameRaw := mrs[i].MetricNameRaw
|
||||
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
|
||||
if firstError == nil {
|
||||
firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
mn.sortTags()
|
||||
|
||||
createAllIndexesForMetricName(isNext, mn, &r.TSID, date)
|
||||
genTSID.TSID = r.TSID
|
||||
genTSID.generation = generation
|
||||
s.putSeriesToCache(metricNameRaw, &genTSID, date)
|
||||
timeseriesPreCreated++
|
||||
}
|
||||
atomic.AddUint64(&s.timeseriesPreCreated, timeseriesPreCreated)
|
||||
|
||||
return firstError
|
||||
}
|
||||
|
||||
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
||||
var date uint64
|
||||
var hour uint64
|
||||
|
@ -1906,6 +2042,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
prevDate uint64
|
||||
prevMetricID uint64
|
||||
)
|
||||
|
||||
idb := s.idb()
|
||||
generation := idb.generation
|
||||
|
||||
hm := s.currHourMetricIDs.Load()
|
||||
hmPrev := s.prevHourMetricIDs.Load()
|
||||
hmPrevDate := hmPrev.hour / 24
|
||||
|
@ -1966,8 +2106,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Slower path: check global cache for (date, metricID) entry.
|
||||
if s.dateMetricIDCache.Has(date, metricID) {
|
||||
// Slower path: check global cache for (generation, date, metricID) entry.
|
||||
if s.dateMetricIDCache.Has(generation, date, metricID) {
|
||||
continue
|
||||
}
|
||||
// Slow path: store the (date, metricID) entry in the indexDB.
|
||||
|
@ -2005,7 +2145,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
return a.tsid.MetricID < b.tsid.MetricID
|
||||
})
|
||||
|
||||
idb := s.idb()
|
||||
is := idb.getIndexSearch(noDeadline)
|
||||
defer idb.putIndexSearch(is)
|
||||
|
||||
|
@ -2035,7 +2174,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
|
|||
}
|
||||
PutMetricName(mn)
|
||||
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.
|
||||
s.dateMetricIDCache.Store(dateMetricIDsForCache)
|
||||
s.dateMetricIDCache.Store(generation, dateMetricIDsForCache)
|
||||
return firstError
|
||||
}
|
||||
|
||||
|
@ -2069,12 +2208,6 @@ func newDateMetricIDCache() *dateMetricIDCache {
|
|||
return &dmc
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Reset() {
|
||||
dmc.mu.Lock()
|
||||
dmc.resetLocked()
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) resetLocked() {
|
||||
// Do not reset syncsCount and resetsCount
|
||||
dmc.byDate.Store(newByDateMetricIDMap())
|
||||
|
@ -2102,9 +2235,9 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 {
|
|||
return n
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
||||
func (dmc *dateMetricIDCache) Has(generation, date, metricID uint64) bool {
|
||||
byDate := dmc.byDate.Load()
|
||||
v := byDate.get(date)
|
||||
v := byDate.get(generation, date)
|
||||
if v.Has(metricID) {
|
||||
// Fast path.
|
||||
// The majority of calls must go here.
|
||||
|
@ -2113,7 +2246,7 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
|
|||
|
||||
// Slow path. Check mutable map.
|
||||
dmc.mu.Lock()
|
||||
v = dmc.byDateMutable.get(date)
|
||||
v = dmc.byDateMutable.get(generation, date)
|
||||
ok := v.Has(metricID)
|
||||
dmc.syncLockedIfNeeded()
|
||||
dmc.mu.Unlock()
|
||||
|
@ -2126,7 +2259,7 @@ type dateMetricID struct {
|
|||
metricID uint64
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) {
|
||||
func (dmc *dateMetricIDCache) Store(generation uint64, dmids []dateMetricID) {
|
||||
var prevDate uint64
|
||||
metricIDs := make([]uint64, 0, len(dmids))
|
||||
dmc.mu.Lock()
|
||||
|
@ -2136,22 +2269,22 @@ func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) {
|
|||
continue
|
||||
}
|
||||
if len(metricIDs) > 0 {
|
||||
v := dmc.byDateMutable.getOrCreate(prevDate)
|
||||
v := dmc.byDateMutable.getOrCreate(generation, prevDate)
|
||||
v.AddMulti(metricIDs)
|
||||
}
|
||||
metricIDs = append(metricIDs[:0], dmid.metricID)
|
||||
prevDate = dmid.date
|
||||
}
|
||||
if len(metricIDs) > 0 {
|
||||
v := dmc.byDateMutable.getOrCreate(prevDate)
|
||||
v := dmc.byDateMutable.getOrCreate(generation, prevDate)
|
||||
v.AddMulti(metricIDs)
|
||||
}
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
||||
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
|
||||
func (dmc *dateMetricIDCache) Set(generation, date, metricID uint64) {
|
||||
dmc.mu.Lock()
|
||||
v := dmc.byDateMutable.getOrCreate(date)
|
||||
v := dmc.byDateMutable.getOrCreate(generation, date)
|
||||
v.Add(metricID)
|
||||
dmc.mu.Unlock()
|
||||
}
|
||||
|
@ -2169,31 +2302,38 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
|||
// Nothing to sync.
|
||||
return
|
||||
}
|
||||
|
||||
// Merge data from byDate into byDateMutable and then atomically replace byDate with the merged data.
|
||||
byDate := dmc.byDate.Load()
|
||||
byDateMutable := dmc.byDateMutable
|
||||
for date, e := range byDateMutable.m {
|
||||
v := byDate.get(date)
|
||||
for k, e := range byDateMutable.m {
|
||||
v := byDate.get(k.generation, k.date)
|
||||
if v == nil {
|
||||
// Nothing to merge
|
||||
continue
|
||||
}
|
||||
v = v.Clone()
|
||||
v.Union(&e.v)
|
||||
dme := &byDateMetricIDEntry{
|
||||
date: date,
|
||||
v: *v,
|
||||
k: k,
|
||||
v: *v,
|
||||
}
|
||||
if date == byDateMutable.hotEntry.Load().date {
|
||||
byDateMutable.m[k] = dme
|
||||
he := byDateMutable.hotEntry.Load()
|
||||
if he.k == k {
|
||||
byDateMutable.hotEntry.Store(dme)
|
||||
}
|
||||
byDateMutable.m[date] = dme
|
||||
}
|
||||
for date, e := range byDate.m {
|
||||
v := byDateMutable.get(date)
|
||||
// Copy entries from byDate, which are missing in byDateMutable
|
||||
for k, e := range byDate.m {
|
||||
v := byDateMutable.get(k.generation, k.date)
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
byDateMutable.m[date] = e
|
||||
byDateMutable.m[k] = e
|
||||
}
|
||||
|
||||
// Atomically replace byDate with byDateMutable
|
||||
dmc.byDate.Store(dmc.byDateMutable)
|
||||
dmc.byDateMutable = newByDateMetricIDMap()
|
||||
|
||||
|
@ -2206,25 +2346,34 @@ func (dmc *dateMetricIDCache) syncLocked() {
|
|||
|
||||
type byDateMetricIDMap struct {
|
||||
hotEntry atomic.Pointer[byDateMetricIDEntry]
|
||||
m map[uint64]*byDateMetricIDEntry
|
||||
m map[generationDateKey]*byDateMetricIDEntry
|
||||
}
|
||||
|
||||
type generationDateKey struct {
|
||||
generation uint64
|
||||
date uint64
|
||||
}
|
||||
|
||||
func newByDateMetricIDMap() *byDateMetricIDMap {
|
||||
dmm := &byDateMetricIDMap{
|
||||
m: make(map[uint64]*byDateMetricIDEntry),
|
||||
m: make(map[generationDateKey]*byDateMetricIDEntry),
|
||||
}
|
||||
dmm.hotEntry.Store(&byDateMetricIDEntry{})
|
||||
return dmm
|
||||
}
|
||||
|
||||
func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
|
||||
func (dmm *byDateMetricIDMap) get(generation, date uint64) *uint64set.Set {
|
||||
hotEntry := dmm.hotEntry.Load()
|
||||
if hotEntry.date == date {
|
||||
if hotEntry.k.generation == generation && hotEntry.k.date == date {
|
||||
// Fast path
|
||||
return &hotEntry.v
|
||||
}
|
||||
// Slow path
|
||||
e := dmm.m[date]
|
||||
k := generationDateKey{
|
||||
generation: generation,
|
||||
date: date,
|
||||
}
|
||||
e := dmm.m[k]
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -2232,36 +2381,41 @@ func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
|
|||
return &e.v
|
||||
}
|
||||
|
||||
func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set {
|
||||
v := dmm.get(date)
|
||||
func (dmm *byDateMetricIDMap) getOrCreate(generation, date uint64) *uint64set.Set {
|
||||
v := dmm.get(generation, date)
|
||||
if v != nil {
|
||||
return v
|
||||
}
|
||||
e := &byDateMetricIDEntry{
|
||||
date: date,
|
||||
k := generationDateKey{
|
||||
generation: generation,
|
||||
date: date,
|
||||
}
|
||||
dmm.m[date] = e
|
||||
e := &byDateMetricIDEntry{
|
||||
k: k,
|
||||
}
|
||||
dmm.m[k] = e
|
||||
return &e.v
|
||||
}
|
||||
|
||||
type byDateMetricIDEntry struct {
|
||||
date uint64
|
||||
v uint64set.Set
|
||||
k generationDateKey
|
||||
v uint64set.Set
|
||||
}
|
||||
|
||||
func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
||||
generation := s.idb().generation
|
||||
e := s.nextDayMetricIDs.Load()
|
||||
s.pendingNextDayMetricIDsLock.Lock()
|
||||
pendingMetricIDs := s.pendingNextDayMetricIDs
|
||||
s.pendingNextDayMetricIDs = &uint64set.Set{}
|
||||
s.pendingNextDayMetricIDsLock.Unlock()
|
||||
if pendingMetricIDs.Len() == 0 && e.date == date {
|
||||
if pendingMetricIDs.Len() == 0 && e.k.generation == generation && e.k.date == date {
|
||||
// Fast path: nothing to update.
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path: union pendingMetricIDs with e.v
|
||||
if e.date == date {
|
||||
if e.k.generation == generation && e.k.date == date {
|
||||
pendingMetricIDs.Union(&e.v)
|
||||
} else {
|
||||
// Do not add pendingMetricIDs from the previous day to the current day,
|
||||
|
@ -2269,9 +2423,13 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
|
|||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
|
||||
pendingMetricIDs = &uint64set.Set{}
|
||||
}
|
||||
k := generationDateKey{
|
||||
generation: generation,
|
||||
date: date,
|
||||
}
|
||||
eNew := &byDateMetricIDEntry{
|
||||
date: date,
|
||||
v: *pendingMetricIDs,
|
||||
k: k,
|
||||
v: *pendingMetricIDs,
|
||||
}
|
||||
s.nextDayMetricIDs.Store(eNew)
|
||||
}
|
||||
|
@ -2335,12 +2493,11 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) {
|
|||
s.tsidCache.Set(metricName, buf)
|
||||
}
|
||||
|
||||
func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) {
|
||||
func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB) {
|
||||
fs.MustMkdirIfNotExist(path)
|
||||
fs.MustRemoveTemporaryDirs(path)
|
||||
|
||||
// Search for the two most recent tables - the last one is active,
|
||||
// the previous one contains backup data.
|
||||
// Search for the three most recent tables - the prev, curr and next.
|
||||
des := fs.MustReadDir(path)
|
||||
var tableNames []string
|
||||
for _, de := range des {
|
||||
|
@ -2358,37 +2515,42 @@ func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) {
|
|||
sort.Slice(tableNames, func(i, j int) bool {
|
||||
return tableNames[i] < tableNames[j]
|
||||
})
|
||||
if len(tableNames) < 2 {
|
||||
// Create missing tables
|
||||
if len(tableNames) == 0 {
|
||||
prevName := nextIndexDBTableName()
|
||||
tableNames = append(tableNames, prevName)
|
||||
}
|
||||
switch len(tableNames) {
|
||||
case 0:
|
||||
prevName := nextIndexDBTableName()
|
||||
currName := nextIndexDBTableName()
|
||||
tableNames = append(tableNames, currName)
|
||||
nextName := nextIndexDBTableName()
|
||||
tableNames = append(tableNames, prevName, currName, nextName)
|
||||
case 1:
|
||||
currName := nextIndexDBTableName()
|
||||
nextName := nextIndexDBTableName()
|
||||
tableNames = append(tableNames, currName, nextName)
|
||||
case 2:
|
||||
nextName := nextIndexDBTableName()
|
||||
tableNames = append(tableNames, nextName)
|
||||
default:
|
||||
// Remove all the tables except the last three tables.
|
||||
for _, tn := range tableNames[:len(tableNames)-3] {
|
||||
pathToRemove := filepath.Join(path, tn)
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
fs.MustRemoveAll(pathToRemove)
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
}
|
||||
fs.MustSyncPath(path)
|
||||
|
||||
tableNames = tableNames[len(tableNames)-3:]
|
||||
}
|
||||
|
||||
// Invariant: len(tableNames) >= 2
|
||||
// Open tables
|
||||
nextPath := filepath.Join(path, tableNames[2])
|
||||
currPath := filepath.Join(path, tableNames[1])
|
||||
prevPath := filepath.Join(path, tableNames[0])
|
||||
|
||||
// Remove all the tables except two last tables.
|
||||
for _, tn := range tableNames[:len(tableNames)-2] {
|
||||
pathToRemove := filepath.Join(path, tn)
|
||||
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
|
||||
fs.MustRemoveAll(pathToRemove)
|
||||
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
|
||||
}
|
||||
next = mustOpenIndexDB(nextPath, s, &s.isReadOnly)
|
||||
curr = mustOpenIndexDB(currPath, s, &s.isReadOnly)
|
||||
prev = mustOpenIndexDB(prevPath, s, &s.isReadOnly)
|
||||
|
||||
// Persist changes on the file system.
|
||||
fs.MustSyncPath(path)
|
||||
|
||||
// Open the last two tables.
|
||||
currPath := filepath.Join(path, tableNames[len(tableNames)-1])
|
||||
|
||||
curr = mustOpenIndexDB(currPath, s, 0, &s.isReadOnly)
|
||||
prevPath := filepath.Join(path, tableNames[len(tableNames)-2])
|
||||
prev = mustOpenIndexDB(prevPath, s, 0, &s.isReadOnly)
|
||||
|
||||
return curr, prev
|
||||
return next, curr, prev
|
||||
}
|
||||
|
||||
var indexDBTableNameRegexp = regexp.MustCompile("^[0-9A-F]{16}$")
|
||||
|
|
|
@ -90,23 +90,25 @@ func TestDateMetricIDCacheConcurrent(t *testing.T) {
|
|||
|
||||
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
||||
type dmk struct {
|
||||
date uint64
|
||||
metricID uint64
|
||||
generation uint64
|
||||
date uint64
|
||||
metricID uint64
|
||||
}
|
||||
m := make(map[dmk]bool)
|
||||
for i := 0; i < 1e5; i++ {
|
||||
generation := uint64(i) % 2
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 1237
|
||||
if !concurrent && c.Has(date, metricID) {
|
||||
if !m[dmk{date, metricID}] {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID)
|
||||
if !concurrent && c.Has(generation, date, metricID) {
|
||||
if !m[dmk{generation, date, metricID}] {
|
||||
return fmt.Errorf("c.Has(%d, %d, %d) must return false, but returned true", generation, date, metricID)
|
||||
}
|
||||
continue
|
||||
}
|
||||
c.Set(date, metricID)
|
||||
m[dmk{date, metricID}] = true
|
||||
if !concurrent && !c.Has(date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
|
||||
c.Set(generation, date, metricID)
|
||||
m[dmk{generation, date, metricID}] = true
|
||||
if !concurrent && !c.Has(generation, date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d, %d) must return true, but returned false", generation, date, metricID)
|
||||
}
|
||||
if i%11234 == 0 {
|
||||
c.mu.Lock()
|
||||
|
@ -114,25 +116,29 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
|||
c.mu.Unlock()
|
||||
}
|
||||
if i%34323 == 0 {
|
||||
c.Reset()
|
||||
c.mu.Lock()
|
||||
c.resetLocked()
|
||||
c.mu.Unlock()
|
||||
m = make(map[dmk]bool)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify fast path after sync.
|
||||
for i := 0; i < 1e5; i++ {
|
||||
generation := uint64(i) % 2
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 123
|
||||
c.Set(date, metricID)
|
||||
c.Set(generation, date, metricID)
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.syncLocked()
|
||||
c.mu.Unlock()
|
||||
for i := 0; i < 1e5; i++ {
|
||||
generation := uint64(i) % 2
|
||||
date := uint64(i) % 3
|
||||
metricID := uint64(i) % 123
|
||||
if !concurrent && !c.Has(date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID)
|
||||
if !concurrent && !c.Has(generation, date, metricID) {
|
||||
return fmt.Errorf("c.Has(%d, %d, %d) must return true after sync", generation, date, metricID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,7 +146,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
|
|||
if n := c.EntriesCount(); !concurrent && n < 123 {
|
||||
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
|
||||
}
|
||||
c.Reset()
|
||||
c.mu.Lock()
|
||||
c.resetLocked()
|
||||
c.mu.Unlock()
|
||||
if n := c.EntriesCount(); !concurrent && n > 0 {
|
||||
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
|
||||
}
|
||||
|
@ -408,45 +416,53 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestNextRetentionDuration(t *testing.T) {
|
||||
validateRetention := func(retention int64) {
|
||||
func TestNextRetentionDeadlineSeconds(t *testing.T) {
|
||||
f := func(currentTime string, retention, offset time.Duration, deadlineExpected string) {
|
||||
t.Helper()
|
||||
validateRetentionAt := func(now time.Time, retention int64) {
|
||||
nowMsecs := now.UnixMilli()
|
||||
d := nextRetentionDurationAt(nowMsecs, retention)
|
||||
if d <= 0 {
|
||||
nextTime := now.Add(d)
|
||||
retentionHuman := time.Duration(retention) * time.Millisecond
|
||||
t.Errorf("unexpected retention duration for retention=%s; got %s(%s); must be %s + %s; offset: %s", retentionHuman, nextTime, d, now, retentionHuman, time.Duration(retentionTimezoneOffsetMsecs)*time.Millisecond)
|
||||
}
|
||||
|
||||
now, err := time.Parse(time.RFC3339, currentTime)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse currentTime=%q: %s", currentTime, err)
|
||||
}
|
||||
|
||||
// UTC offsets are in range [-12 hours, +14 hours].
|
||||
// Verify that any legit combination of retention timezone and local time
|
||||
// will return valid retention duration.
|
||||
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4207
|
||||
for retentionOffset := -12; retentionOffset <= 14; retentionOffset++ {
|
||||
SetRetentionTimezoneOffset(time.Duration(retentionOffset) * time.Hour)
|
||||
validateRetentionAt(time.Now().UTC(), retention)
|
||||
|
||||
now := time.Date(2023, 4, 27, 23, 58, 0, 0, time.UTC)
|
||||
validateRetentionAt(now, retention)
|
||||
|
||||
now = time.Date(2023, 4, 27, 0, 1, 0, 0, time.UTC)
|
||||
validateRetentionAt(now, retention)
|
||||
|
||||
now = time.Date(2023, 4, 27, 0, 0, 0, 0, time.UTC)
|
||||
validateRetentionAt(now, retention)
|
||||
d := nextRetentionDeadlineSeconds(now.Unix(), int64(retention.Seconds()), int64(offset.Seconds()))
|
||||
deadline := time.Unix(d, 0).UTC().Format(time.RFC3339)
|
||||
if deadline != deadlineExpected {
|
||||
t.Fatalf("unexpected deadline; got %s; want %s", deadline, deadlineExpected)
|
||||
}
|
||||
}
|
||||
|
||||
for retentionDays := 0.3; retentionDays < 3; retentionDays += 0.3 {
|
||||
validateRetention(int64(retentionDays * msecPerDay))
|
||||
}
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
||||
f("2023-07-22T03:44:35Z", 24*time.Hour, 0, "2023-07-22T04:00:00Z")
|
||||
f("2023-07-22T04:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
||||
f("2023-07-22T23:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
||||
f("2023-07-23T03:59:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
|
||||
|
||||
for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 {
|
||||
validateRetention(int64(retentionMonths * msecsPerMonth))
|
||||
}
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
||||
f("2023-07-22T01:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-22T02:00:00Z")
|
||||
f("2023-07-22T02:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
||||
f("2023-07-22T23:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
||||
f("2023-07-23T01:59:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
|
||||
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
|
||||
f("2023-07-22T08:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-22T09:00:00Z")
|
||||
f("2023-07-22T09:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
|
||||
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
|
||||
f("2023-07-22T15:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
|
||||
f("2023-07-22T16:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-23T16:00:00Z")
|
||||
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
|
||||
f("2023-07-22T21:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
|
||||
f("2023-07-22T22:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-23T22:00:00Z")
|
||||
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
|
||||
f("2023-07-22T09:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-22T10:00:00Z")
|
||||
f("2023-07-22T10:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
|
||||
|
||||
f("2023-07-22T12:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
|
||||
f("2023-07-22T14:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
|
||||
f("2023-07-22T15:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-23T15:00:00Z")
|
||||
}
|
||||
|
||||
func TestStorageOpenClose(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue