lib/storage: pre-create timeseries before indexDB rotation (#4652)

* lib/storage: pre-create timeseries before indexDB rotation
during an hour before indexDB rotation start creating records at the next indexDB
it must improve performance during switch for the next indexDB and remove ingestion issues.
Since there is no need for creation new index records for timeseries already ingested into current indexDB
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563

* lib/storage: further work on indexdb rotation optimization

- Document the change at docs/CHAGNELOG.md
- Move back various caches from indexDB to Storage. This makes the change less intrusive.
  The dateMetricIDCache now takes into account indexDB generation, so it stores (date, metricID)
  entries for both the current and the next indexDB.
- Consolidate the code responsible for idbNext pre-filling into prefillNextIndexDB() function.
  This improves code readability and maintainability a bit.
- Rewrite and simplify the code responsible for calculating the next retention timestamp.
  Add various tests for corner cases of this code.
- Remove indexdb pre-filling from RegisterMetricNames() function, since this function is rarely called.
  It is OK to add indexdb entries on demand in this function. This simplifies the code.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

* docs/CHANGELOG.md: refer to https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2023-07-23 00:20:21 +02:00 committed by Aliaksandr Valialkin
parent 7750c5a553
commit 30b32583f4
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
6 changed files with 368 additions and 193 deletions

View file

@ -584,6 +584,9 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_timeseries_repopulated_total`, func() float64 {
return float64(m().TimeseriesRepopulated)
})
metrics.NewGauge(`vm_timeseries_precreated_total`, func() float64 {
return float64(m().TimeseriesPreCreated)
})
metrics.NewGauge(`vm_new_timeseries_created_total`, func() float64 {
return float64(m().NewTimeseriesCreated)
})

View file

@ -27,7 +27,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
* SECURITY: upgrade base docker image (alpine) from 3.18.0 to 3.18.2. See [alpine 3.18.2 release notes](https://alpinelinux.org/posts/Alpine-3.15.9-3.16.6-3.17.4-3.18.2-released.html).
* SECURITY: upgrade Go builder from Go1.20.5 to Go1.20.6. See [the list of issues addressed in Go1.20.6](https://github.com/golang/go/issues?q=milestone%3AGo1.20.6+label%3ACherryPickApproved).
* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) for details.
* FEATURE: reduce memory usage by up to 5x for setups with [high churn rate](https://docs.victoriametrics.com/FAQ.html#what-is-high-churn-rate) and long [retention](https://docs.victoriametrics.com/#retention). See [the description for this change](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/7094fa38bc207c7bd7330ea8a834310a310ce5e3) and [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4563) for details.
* FEATURE: reduce spikes in CPU and disk IO usage during `indexdb` rotation (aka inverted index), which is performed once per [`-retentionPeriod`](https://docs.victoriametrics.com/#retention). The new algorithm gradually pre-populates newly created `indexdb` during the last hour before the rotation. The number of pre-populated series in the newly created `indexdb` can be [monitored](https://docs.victoriametrics.com/#monitoring) via `vm_timeseries_precreated_total` metric. This should resolve [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): allow selecting time series matching at least one of multiple `or` filters. For example, `{env="prod",job="a" or env="dev",job="b"}` selects series with either `{env="prod",job="a"}` or `{env="dev",job="b"}` labels. This functionality allows passing the selected series to [rollup functions](https://docs.victoriametrics.com/MetricsQL.html#rollup-functions) without the need to use [subqueries](https://docs.victoriametrics.com/MetricsQL.html#subqueries). See [these docs](https://docs.victoriametrics.com/keyConcepts.html#filtering-by-multiple-or-filters).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to preserve metric names for binary operation results via `keep_metric_names` modifier. For example, `({__name__=~"foo|bar"} / 10) keep_metric_names` leaves `foo` and `bar` metric names in division results. See [these docs](https://docs.victoriametrics.com/MetricsQL.html#keep_metric_names). This helps to address issues like [this one](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3710).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): add ability to copy all the labels from `one` side of [many-to-one operations](https://prometheus.io/docs/prometheus/latest/querying/operators/#many-to-one-and-one-to-many-vector-matches) by specifying `*` inside `group_left()` or `group_right()`. Also allow adding a prefix for copied label names via `group_left(*) prefix "..."` syntax. For example, the following query copies Kubernetes namespace labels to `kube_pod_info` series and adds `ns_` prefix for the copied label names: `kube_pod_info * on(namespace) group_left(*) prefix "ns_" kube_namespace_labels`. The labels from `on()` list aren't prefixed. This feature resolves [this](https://stackoverflow.com/questions/76661818/how-to-add-namespace-labels-to-pod-labels-in-prometheus) and [that](https://stackoverflow.com/questions/76653997/how-can-i-make-a-new-copy-of-kube-namespace-labels-metric-with-a-different-name) questions at StackOverflow.

View file

@ -96,9 +96,6 @@ type indexDB struct {
// and is used for syncing items from different indexDBs
generation uint64
// The unix timestamp in seconds for the indexDB rotation.
rotationTimestamp uint64
name string
tb *mergeset.Table
@ -136,10 +133,7 @@ func getTagFiltersCacheSize() int {
//
// The last segment of the path should contain unique hex value which
// will be then used as indexDB.generation
//
// The rotationTimestamp must be set to the current unix timestamp when mustOpenIndexDB
// is called when creating new indexdb during indexdb rotation.
func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) *indexDB {
func mustOpenIndexDB(path string, s *Storage, isReadOnly *uint32) *indexDB {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
@ -157,11 +151,10 @@ func mustOpenIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOn
tagFiltersCacheSize := getTagFiltersCacheSize()
db := &indexDB{
refCount: 1,
generation: gen,
rotationTimestamp: rotationTimestamp,
tb: tb,
name: name,
refCount: 1,
generation: gen,
tb: tb,
name: name,
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s,

View file

@ -511,7 +511,7 @@ func TestIndexDBOpenClose(t *testing.T) {
tableName := nextIndexDBTableName()
for i := 0; i < 5; i++ {
var isReadOnly uint32
db := mustOpenIndexDB(tableName, &s, 0, &isReadOnly)
db := mustOpenIndexDB(tableName, &s, &isReadOnly)
db.MustClose()
}
if err := os.RemoveAll(tableName); err != nil {

View file

@ -45,6 +45,7 @@ type Storage struct {
tooBigTimestampRows uint64
timeseriesRepopulated uint64
timeseriesPreCreated uint64
newTimeseriesCreated uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
@ -53,6 +54,13 @@ type Storage struct {
hourlySeriesLimitRowsDropped uint64
dailySeriesLimitRowsDropped uint64
// nextRotationTimestamp is a timestamp in seconds of the next indexdb rotation.
//
// It is used for gradual pre-population of the idbNext during the last hour before the indexdb rotation.
// in order to reduce spikes in CPU and disk IO usage just after the rotiation.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
nextRotationTimestamp int64
path string
cachePath string
retentionMsecs int64
@ -60,8 +68,17 @@ type Storage struct {
// lock file for exclusive access to the storage on the given path.
flockF *os.File
// idbCurr contains the currently used indexdb.
idbCurr atomic.Pointer[indexDB]
// idbNext is the next indexdb, which will become idbCurr at the next rotation.
//
// It is started to be gradually pre-populated with the data for active time series during the last hour
// before nextRotationTimestamp.
// This reduces spikes in CPU and disk IO usage just after the rotiation.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
idbNext atomic.Pointer[indexDB]
tb *table
// Series cardinality limiters.
@ -77,7 +94,8 @@ type Storage struct {
// metricNameCache is MetricID -> MetricName cache.
metricNameCache *workingsetcache.Cache
// dateMetricIDCache is (Date, MetricID) cache.
// dateMetricIDCache is (generation, Date, MetricID) cache, where generation is the indexdb generation.
// See generationTSID for details.
dateMetricIDCache *dateMetricIDCache
// Fast cache for MetricID values occurred during the current hour.
@ -213,9 +231,6 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail
s.currHourMetricIDs.Store(hmCurr)
s.prevHourMetricIDs.Store(hmPrev)
date := fasttime.UnixDate()
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(date)
s.nextDayMetricIDs.Store(nextDayMetricIDs)
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.prefetchedMetricIDs.Store(&uint64set.Set{})
@ -231,9 +246,23 @@ func MustOpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDail
idbSnapshotsPath := filepath.Join(idbPath, snapshotsDirname)
fs.MustMkdirIfNotExist(idbSnapshotsPath)
fs.MustRemoveTemporaryDirs(idbSnapshotsPath)
idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
idbNext, idbCurr, idbPrev := s.mustOpenIndexDBTables(idbPath)
idbCurr.SetExtDB(idbPrev)
idbNext.SetExtDB(idbCurr)
s.idbCurr.Store(idbCurr)
s.idbNext.Store(idbNext)
// Initialize nextRotationTimestamp
nowSecs := time.Now().UnixNano() / 1e9
nextRotationTimestamp := nextRetentionDeadlineSeconds(nowSecs, retentionMsecs/1000, retentionTimezoneOffsetSecs)
atomic.StoreInt64(&s.nextRotationTimestamp, nextRotationTimestamp)
// Load nextDayMetricIDs cache
date := fasttime.UnixDate()
nextDayMetricIDs := s.mustLoadNextDayMetricIDs(idbCurr.generation, date)
s.nextDayMetricIDs.Store(nextDayMetricIDs)
// Load deleted metricIDs from idbCurr and idbPrev
dmisCurr, err := idbCurr.loadDeletedMetricIDs()
@ -461,6 +490,7 @@ type Metrics struct {
TooBigTimestampRows uint64
TimeseriesRepopulated uint64
TimeseriesPreCreated uint64
NewTimeseriesCreated uint64
SlowRowInserts uint64
SlowPerDayIndexInserts uint64
@ -532,6 +562,7 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.TooBigTimestampRows += atomic.LoadUint64(&s.tooBigTimestampRows)
m.TimeseriesRepopulated += atomic.LoadUint64(&s.timeseriesRepopulated)
m.TimeseriesPreCreated += atomic.LoadUint64(&s.timeseriesPreCreated)
m.NewTimeseriesCreated += atomic.LoadUint64(&s.newTimeseriesCreated)
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
m.SlowPerDayIndexInserts += atomic.LoadUint64(&s.slowPerDayIndexInserts)
@ -602,12 +633,20 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.PrefetchedMetricIDsSize += uint64(prefetchedMetricIDs.Len())
m.PrefetchedMetricIDsSizeBytes += uint64(prefetchedMetricIDs.SizeBytes())
m.NextRetentionSeconds = uint64(nextRetentionDuration(s.retentionMsecs).Seconds())
d := s.nextRetentionSeconds()
if d < 0 {
d = 0
}
m.NextRetentionSeconds = uint64(d)
s.idb().UpdateMetrics(&m.IndexDBMetrics)
s.tb.UpdateMetrics(&m.TableMetrics)
}
func (s *Storage) nextRetentionSeconds() int64 {
return atomic.LoadInt64(&s.nextRotationTimestamp) - int64(fasttime.UnixTimestamp())
}
// SetFreeDiskSpaceLimit sets the minimum free disk space size of current storage path
//
// The function must be called before opening or creating any storage.
@ -664,11 +703,11 @@ func (s *Storage) startRetentionWatcher() {
func (s *Storage) retentionWatcher() {
for {
d := nextRetentionDuration(s.retentionMsecs)
d := s.nextRetentionSeconds()
select {
case <-s.stop:
return
case <-time.After(d):
case <-time.After(time.Second * time.Duration(d)):
s.mustRotateIndexDB()
}
}
@ -727,23 +766,29 @@ func (s *Storage) nextDayMetricIDsUpdater() {
}
func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
// Create new indexdb table, which will be used as idbNext
newTableName := nextIndexDBTableName()
idbNewPath := filepath.Join(s.path, indexdbDirname, newTableName)
rotationTimestamp := fasttime.UnixTimestamp()
idbNew := mustOpenIndexDB(idbNewPath, s, rotationTimestamp, &s.isReadOnly)
idbNew := mustOpenIndexDB(idbNewPath, s, &s.isReadOnly)
// Drop extDB
// Update nextRotationTimestamp
atomic.AddInt64(&s.nextRotationTimestamp, s.retentionMsecs/1000)
// Set idbNext to idbNew
idbNext := s.idbNext.Load()
idbNew.SetExtDB(idbNext)
s.idbNext.Store(idbNew)
// Set idbCurr to idbNext
idbCurr := s.idb()
s.idbCurr.Store(idbNext)
// Schedule data removal for idbPrev
idbCurr.doExtDB(func(extDB *indexDB) {
extDB.scheduleToDrop()
})
idbCurr.SetExtDB(nil)
// Start using idbNew
idbNew.SetExtDB(idbCurr)
s.idbCurr.Store(idbNew)
// Persist changes on the file system.
fs.MustSyncPath(s.path)
@ -753,7 +798,7 @@ func (s *Storage) mustRotateIndexDB() {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401
// Flush metric id caches for the current and the previous hour,
// since they may contain entries missing in idbNew.
// since they may contain entries missing in idbCurr after the rotation.
// This should prevent from missing data in queries when
// the following steps are performed for short -retentionPeriod (e.g. 1 day):
//
@ -763,7 +808,7 @@ func (s *Storage) mustRotateIndexDB() {
// These series are already registered in prevHourMetricIDs, so VM doesn't add per-day entries to the current indexdb.
// 4. Stop adding new samples for these series just before 5 UTC.
// 5. The next indexdb rotation is performed at 4 UTC next day.
// The information about the series from step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
// The information about the series added at step 3 disappears from indexdb, since the old indexdb from step 1 is deleted,
// while the current indexdb doesn't contain information about the series.
// So queries for the last 24 hours stop returning samples added at step 3.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2698
@ -773,13 +818,12 @@ func (s *Storage) mustRotateIndexDB() {
s.currHourMetricIDs.Store(&hourMetricIDs{})
s.prevHourMetricIDs.Store(&hourMetricIDs{})
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s.dateMetricIDCache.Reset()
// Do not flush dateMetricIDCache, since it contains entries prefixed with idb generation.
// There is no need in resetting nextDayMetricIDs, since it contains entries prefixed with idb generation.
// Do not flush metricIDCache and metricNameCache, since all the metricIDs
// from prev idb remain valid after the rotation.
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
}
func (s *Storage) resetAndSaveTSIDCache() {
@ -833,11 +877,14 @@ func (s *Storage) MustClose() {
}
}
func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
func (s *Storage) mustLoadNextDayMetricIDs(generation, date uint64) *byDateMetricIDEntry {
e := &byDateMetricIDEntry{
date: date,
k: generationDateKey{
generation: generation,
date: date,
},
}
name := "next_day_metric_ids"
name := "next_day_metric_ids_v2"
path := filepath.Join(s.cachePath, name)
if !fs.IsPathExist(path) {
return e
@ -846,12 +893,17 @@ func (s *Storage) mustLoadNextDayMetricIDs(date uint64) *byDateMetricIDEntry {
if err != nil {
logger.Panicf("FATAL: cannot read %s: %s", path, err)
}
if len(src) < 16 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 16)
if len(src) < 24 {
logger.Errorf("discarding %s, since it has broken header; got %d bytes; want %d bytes", path, len(src), 24)
return e
}
// Unmarshal header
generationLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if generationLoaded != generation {
logger.Infof("discarding %s, since it contains data for stale generation; got %d; want %d", path, generationLoaded, generation)
}
dateLoaded := encoding.UnmarshalUint64(src)
src = src[8:]
if dateLoaded != date {
@ -948,12 +1000,13 @@ func (s *Storage) mustLoadHourMetricIDs(hour uint64, name string) *hourMetricIDs
}
func (s *Storage) mustSaveNextDayMetricIDs(e *byDateMetricIDEntry) {
name := "next_day_metric_ids"
name := "next_day_metric_ids_v2"
path := filepath.Join(s.cachePath, name)
dst := make([]byte, 0, e.v.Len()*8+16)
// Marshal header
dst = encoding.MarshalUint64(dst, e.date)
dst = encoding.MarshalUint64(dst, e.k.generation)
dst = encoding.MarshalUint64(dst, e.k.date)
// Marshal e.v
dst = marshalUint64Set(dst, &e.v)
@ -1072,30 +1125,31 @@ var saveCacheLock sync.Mutex
// SetRetentionTimezoneOffset sets the offset, which is used for calculating the time for indexdb rotation.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574
func SetRetentionTimezoneOffset(offset time.Duration) {
retentionTimezoneOffsetMsecs = offset.Milliseconds()
retentionTimezoneOffsetSecs = int64(offset.Seconds())
}
var retentionTimezoneOffsetMsecs int64
var retentionTimezoneOffsetSecs int64
func nextRetentionDuration(retentionMsecs int64) time.Duration {
nowMsecs := time.Now().UnixNano() / 1e6
return nextRetentionDurationAt(nowMsecs, retentionMsecs)
}
func nextRetentionDeadlineSeconds(atSecs, retentionSecs, offsetSecs int64) int64 {
// Round retentionSecs to days. This guarantees that per-day inverted index works as expected
const secsPerDay = 24 * 3600
retentionSecs = ((retentionSecs + secsPerDay - 1) / secsPerDay) * secsPerDay
func nextRetentionDurationAt(atMsecs int64, retentionMsecs int64) time.Duration {
// Round retentionMsecs to days. This guarantees that per-day inverted index works as expected
retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay
// Schedule the deadline to +4 hours from the next retention period start
// because of historical reasons - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248
offsetSecs -= 4 * 3600
// The effect of time zone on retention period is moved out.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2574
deadline := ((atMsecs + retentionMsecs + retentionTimezoneOffsetMsecs - 1) / retentionMsecs) * retentionMsecs
// Make sure that offsetSecs doesn't exceed retentionSecs
offsetSecs %= retentionSecs
// Schedule the deadline to +4 hours from the next retention period start.
// This should prevent from possible double deletion of indexdb
// due to time drift - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/248 .
deadline += int64(4 * 3600 * 1000)
deadline -= retentionTimezoneOffsetMsecs
return time.Duration(deadline-atMsecs) * time.Millisecond
// align the retention deadline to multiples of retentionSecs
// This makes the deadline independent of atSecs.
deadline := ((atSecs + offsetSecs + retentionSecs - 1) / retentionSecs) * retentionSecs
// Apply the provided offsetSecs
deadline -= offsetSecs
return deadline
}
// SearchMetricNames returns marshaled metric names matching the given tfss on the given tr.
@ -1661,6 +1715,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
var seriesRepopulated uint64
idb := s.idb()
generation := idb.generation
is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is)
var firstWarn error
@ -1673,7 +1728,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
// Skip row, since it exceeds cardinality limit
continue
}
if genTSID.generation != idb.generation {
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
if err := mn.UnmarshalRaw(mr.MetricNameRaw); err != nil {
@ -1688,7 +1743,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
mn.sortTags()
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
}
@ -1718,10 +1773,10 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
continue
}
if genTSID.generation != idb.generation {
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
seriesRepopulated++
}
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
@ -1739,12 +1794,15 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
// Schedule creating TSID indexes instead of creating them synchronously.
// This should keep stable the ingestion rate when new time series are ingested.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
}
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
// There is no need in pre-filling idbNext here, since RegisterMetricNames() is rarely called.
// So it is OK to register metric names in blocking manner after indexdb rotation.
if firstWarn != nil {
logger.Warnf("cannot create some metrics: %s", firstWarn)
}
@ -1752,6 +1810,7 @@ func (s *Storage) RegisterMetricNames(qt *querytracer.Tracer, mrs []MetricRow) {
func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {
idb := s.idb()
generation := idb.generation
is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is)
@ -1835,7 +1894,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
prevTSID = r.TSID
prevMetricNameRaw = mr.MetricNameRaw
if genTSID.generation != idb.generation {
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
date := uint64(r.Timestamp) / msecPerDay
@ -1849,7 +1908,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
mn.sortTags()
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
seriesRepopulated++
slowInsertsCount++
@ -1883,10 +1942,10 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
continue
}
if genTSID.generation != idb.generation {
if genTSID.generation < generation {
// The found TSID is from the previous indexdb. Create it in the current indexdb.
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
seriesRepopulated++
}
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
@ -1907,7 +1966,7 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
}
createAllIndexesForMetricName(is, mn, &genTSID.TSID, date)
genTSID.generation = idb.generation
genTSID.generation = generation
s.putSeriesToCache(mr.MetricNameRaw, &genTSID, date)
newSeriesCount++
@ -1924,11 +1983,17 @@ func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, preci
atomic.AddUint64(&s.newTimeseriesCreated, newSeriesCount)
atomic.AddUint64(&s.timeseriesRepopulated, seriesRepopulated)
dstMrs = dstMrs[:j]
rows = rows[:j]
if err := s.prefillNextIndexDB(rows, dstMrs); err != nil {
if firstWarn == nil {
firstWarn = err
}
}
if firstWarn != nil {
storageAddRowsLogger.Warnf("warn occurred during rows addition: %s", firstWarn)
}
dstMrs = dstMrs[:j]
rows = rows[:j]
err := s.updatePerDateData(rows, dstMrs)
if err != nil {
@ -1963,9 +2028,9 @@ func (s *Storage) putSeriesToCache(metricNameRaw []byte, genTSID *generationTSID
// so future rows for that TSID are ingested via fast path.
s.putTSIDToCache(genTSID, metricNameRaw)
// Register the (date, metricID) entry in the cache,
// Register the (generation, date, metricID) entry in the cache,
// so next time the entry is found there instead of searching for it in the indexdb.
s.dateMetricIDCache.Set(date, genTSID.TSID.MetricID)
s.dateMetricIDCache.Set(genTSID.generation, date, genTSID.TSID.MetricID)
}
func (s *Storage) registerSeriesCardinality(metricID uint64, metricNameRaw []byte) bool {
@ -2004,6 +2069,77 @@ func getUserReadableMetricName(metricNameRaw []byte) string {
return mn.String()
}
func (s *Storage) prefillNextIndexDB(rows []rawRow, mrs []*MetricRow) error {
d := s.nextRetentionSeconds()
if d >= 3600 {
// Fast path: nothing to pre-fill because it is too early.
// The pre-fill is started during the last hour before the indexdb rotation.
return nil
}
// Slower path: less than hour left for the next indexdb rotation.
// Pre-populate idbNext with the increasing probability until the rotation.
// The probability increases from 0% to 100% proportioinally to d=[3600 .. 0].
pMin := float64(d) / 3600
idbNext := s.idbNext.Load()
generation := idbNext.generation
isNext := idbNext.getIndexSearch(0, 0, noDeadline)
defer idbNext.putIndexSearch(isNext)
var firstError error
var genTSID generationTSID
mn := GetMetricName()
defer PutMetricName(mn)
timeseriesPreCreated := uint64(0)
for i := range rows {
r := &rows[i]
p := float64(uint32(fastHashUint64(r.TSID.MetricID))) / (1 << 32)
if p < pMin {
// Fast path: it is too early to pre-fill indexes for the given MetricID.
continue
}
// Check whether the given MetricID is already present in dateMetricIDCache.
date := uint64(r.Timestamp) / msecPerDay
metricID := r.TSID.MetricID
if s.dateMetricIDCache.Has(generation, date, metricID) {
// Indexes are already pre-filled.
continue
}
// Check whether the given (date, metricID) is already present in idbNext.
if isNext.hasDateMetricIDNoExtDB(date, metricID, r.TSID.AccountID, r.TSID.ProjectID) {
// Indexes are already pre-filled at idbNext.
//
// Register the (generation, date, metricID) entry in the cache,
// so next time the entry is found there instead of searching for it in the indexdb.
s.dateMetricIDCache.Set(generation, date, metricID)
continue
}
// Slow path: pre-fill indexes in idbNext.
metricNameRaw := mrs[i].MetricNameRaw
if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
if firstError == nil {
firstError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %w", metricNameRaw, err)
}
continue
}
mn.sortTags()
createAllIndexesForMetricName(isNext, mn, &r.TSID, date)
genTSID.TSID = r.TSID
genTSID.generation = generation
s.putSeriesToCache(metricNameRaw, &genTSID, date)
timeseriesPreCreated++
}
atomic.AddUint64(&s.timeseriesPreCreated, timeseriesPreCreated)
return firstError
}
func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
var date uint64
var hour uint64
@ -2014,6 +2150,10 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
prevDate uint64
prevMetricID uint64
)
idb := s.idb()
generation := idb.generation
hm := s.currHourMetricIDs.Load()
hmPrev := s.prevHourMetricIDs.Load()
hmPrevDate := hmPrev.hour / 24
@ -2079,8 +2219,8 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
}
// Slower path: check global cache for (date, metricID) entry.
if s.dateMetricIDCache.Has(date, metricID) {
// Slower path: check global cache for (generation, date, metricID) entry.
if s.dateMetricIDCache.Has(generation, date, metricID) {
continue
}
// Slow path: store the (date, metricID) entry in the indexDB.
@ -2124,7 +2264,6 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
return a.tsid.MetricID < b.tsid.MetricID
})
idb := s.idb()
is := idb.getIndexSearch(0, 0, noDeadline)
defer idb.putIndexSearch(is)
@ -2154,7 +2293,7 @@ func (s *Storage) updatePerDateData(rows []rawRow, mrs []*MetricRow) error {
}
PutMetricName(mn)
// The (date, metricID) entries must be added to cache only after they have been successfully added to indexDB.
s.dateMetricIDCache.Store(dateMetricIDsForCache)
s.dateMetricIDCache.Store(generation, dateMetricIDsForCache)
return firstError
}
@ -2188,12 +2327,6 @@ func newDateMetricIDCache() *dateMetricIDCache {
return &dmc
}
func (dmc *dateMetricIDCache) Reset() {
dmc.mu.Lock()
dmc.resetLocked()
dmc.mu.Unlock()
}
func (dmc *dateMetricIDCache) resetLocked() {
// Do not reset syncsCount and resetsCount
dmc.byDate.Store(newByDateMetricIDMap())
@ -2221,9 +2354,9 @@ func (dmc *dateMetricIDCache) SizeBytes() uint64 {
return n
}
func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
func (dmc *dateMetricIDCache) Has(generation, date, metricID uint64) bool {
byDate := dmc.byDate.Load()
v := byDate.get(date)
v := byDate.get(generation, date)
if v.Has(metricID) {
// Fast path.
// The majority of calls must go here.
@ -2232,7 +2365,7 @@ func (dmc *dateMetricIDCache) Has(date, metricID uint64) bool {
// Slow path. Check mutable map.
dmc.mu.Lock()
v = dmc.byDateMutable.get(date)
v = dmc.byDateMutable.get(generation, date)
ok := v.Has(metricID)
dmc.syncLockedIfNeeded()
dmc.mu.Unlock()
@ -2245,7 +2378,7 @@ type dateMetricID struct {
metricID uint64
}
func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) {
func (dmc *dateMetricIDCache) Store(generation uint64, dmids []dateMetricID) {
var prevDate uint64
metricIDs := make([]uint64, 0, len(dmids))
dmc.mu.Lock()
@ -2255,22 +2388,22 @@ func (dmc *dateMetricIDCache) Store(dmids []dateMetricID) {
continue
}
if len(metricIDs) > 0 {
v := dmc.byDateMutable.getOrCreate(prevDate)
v := dmc.byDateMutable.getOrCreate(generation, prevDate)
v.AddMulti(metricIDs)
}
metricIDs = append(metricIDs[:0], dmid.metricID)
prevDate = dmid.date
}
if len(metricIDs) > 0 {
v := dmc.byDateMutable.getOrCreate(prevDate)
v := dmc.byDateMutable.getOrCreate(generation, prevDate)
v.AddMulti(metricIDs)
}
dmc.mu.Unlock()
}
func (dmc *dateMetricIDCache) Set(date, metricID uint64) {
func (dmc *dateMetricIDCache) Set(generation, date, metricID uint64) {
dmc.mu.Lock()
v := dmc.byDateMutable.getOrCreate(date)
v := dmc.byDateMutable.getOrCreate(generation, date)
v.Add(metricID)
dmc.mu.Unlock()
}
@ -2288,31 +2421,38 @@ func (dmc *dateMetricIDCache) syncLocked() {
// Nothing to sync.
return
}
// Merge data from byDate into byDateMutable and then atomically replace byDate with the merged data.
byDate := dmc.byDate.Load()
byDateMutable := dmc.byDateMutable
for date, e := range byDateMutable.m {
v := byDate.get(date)
for k, e := range byDateMutable.m {
v := byDate.get(k.generation, k.date)
if v == nil {
// Nothing to merge
continue
}
v = v.Clone()
v.Union(&e.v)
dme := &byDateMetricIDEntry{
date: date,
v: *v,
k: k,
v: *v,
}
if date == byDateMutable.hotEntry.Load().date {
byDateMutable.m[k] = dme
he := byDateMutable.hotEntry.Load()
if he.k == k {
byDateMutable.hotEntry.Store(dme)
}
byDateMutable.m[date] = dme
}
for date, e := range byDate.m {
v := byDateMutable.get(date)
// Copy entries from byDate, which are missing in byDateMutable
for k, e := range byDate.m {
v := byDateMutable.get(k.generation, k.date)
if v != nil {
continue
}
byDateMutable.m[date] = e
byDateMutable.m[k] = e
}
// Atomically replace byDate with byDateMutable
dmc.byDate.Store(dmc.byDateMutable)
dmc.byDateMutable = newByDateMetricIDMap()
@ -2325,25 +2465,34 @@ func (dmc *dateMetricIDCache) syncLocked() {
type byDateMetricIDMap struct {
hotEntry atomic.Pointer[byDateMetricIDEntry]
m map[uint64]*byDateMetricIDEntry
m map[generationDateKey]*byDateMetricIDEntry
}
type generationDateKey struct {
generation uint64
date uint64
}
func newByDateMetricIDMap() *byDateMetricIDMap {
dmm := &byDateMetricIDMap{
m: make(map[uint64]*byDateMetricIDEntry),
m: make(map[generationDateKey]*byDateMetricIDEntry),
}
dmm.hotEntry.Store(&byDateMetricIDEntry{})
return dmm
}
func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
func (dmm *byDateMetricIDMap) get(generation, date uint64) *uint64set.Set {
hotEntry := dmm.hotEntry.Load()
if hotEntry.date == date {
if hotEntry.k.generation == generation && hotEntry.k.date == date {
// Fast path
return &hotEntry.v
}
// Slow path
e := dmm.m[date]
k := generationDateKey{
generation: generation,
date: date,
}
e := dmm.m[k]
if e == nil {
return nil
}
@ -2351,36 +2500,41 @@ func (dmm *byDateMetricIDMap) get(date uint64) *uint64set.Set {
return &e.v
}
func (dmm *byDateMetricIDMap) getOrCreate(date uint64) *uint64set.Set {
v := dmm.get(date)
func (dmm *byDateMetricIDMap) getOrCreate(generation, date uint64) *uint64set.Set {
v := dmm.get(generation, date)
if v != nil {
return v
}
e := &byDateMetricIDEntry{
date: date,
k := generationDateKey{
generation: generation,
date: date,
}
dmm.m[date] = e
e := &byDateMetricIDEntry{
k: k,
}
dmm.m[k] = e
return &e.v
}
type byDateMetricIDEntry struct {
date uint64
v uint64set.Set
k generationDateKey
v uint64set.Set
}
func (s *Storage) updateNextDayMetricIDs(date uint64) {
generation := s.idb().generation
e := s.nextDayMetricIDs.Load()
s.pendingNextDayMetricIDsLock.Lock()
pendingMetricIDs := s.pendingNextDayMetricIDs
s.pendingNextDayMetricIDs = &uint64set.Set{}
s.pendingNextDayMetricIDsLock.Unlock()
if pendingMetricIDs.Len() == 0 && e.date == date {
if pendingMetricIDs.Len() == 0 && e.k.generation == generation && e.k.date == date {
// Fast path: nothing to update.
return
}
// Slow path: union pendingMetricIDs with e.v
if e.date == date {
if e.k.generation == generation && e.k.date == date {
pendingMetricIDs.Union(&e.v)
} else {
// Do not add pendingMetricIDs from the previous day to the current day,
@ -2388,9 +2542,13 @@ func (s *Storage) updateNextDayMetricIDs(date uint64) {
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3309
pendingMetricIDs = &uint64set.Set{}
}
k := generationDateKey{
generation: generation,
date: date,
}
eNew := &byDateMetricIDEntry{
date: date,
v: *pendingMetricIDs,
k: k,
v: *pendingMetricIDs,
}
s.nextDayMetricIDs.Store(eNew)
}
@ -2486,12 +2644,11 @@ func (s *Storage) putTSIDToCache(tsid *generationTSID, metricName []byte) {
s.tsidCache.Set(metricName, buf)
}
func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) {
func (s *Storage) mustOpenIndexDBTables(path string) (next, curr, prev *indexDB) {
fs.MustMkdirIfNotExist(path)
fs.MustRemoveTemporaryDirs(path)
// Search for the two most recent tables - the last one is active,
// the previous one contains backup data.
// Search for the three most recent tables - the prev, curr and next.
des := fs.MustReadDir(path)
var tableNames []string
for _, de := range des {
@ -2509,37 +2666,42 @@ func (s *Storage) mustOpenIndexDBTables(path string) (curr, prev *indexDB) {
sort.Slice(tableNames, func(i, j int) bool {
return tableNames[i] < tableNames[j]
})
if len(tableNames) < 2 {
// Create missing tables
if len(tableNames) == 0 {
prevName := nextIndexDBTableName()
tableNames = append(tableNames, prevName)
}
switch len(tableNames) {
case 0:
prevName := nextIndexDBTableName()
currName := nextIndexDBTableName()
tableNames = append(tableNames, currName)
nextName := nextIndexDBTableName()
tableNames = append(tableNames, prevName, currName, nextName)
case 1:
currName := nextIndexDBTableName()
nextName := nextIndexDBTableName()
tableNames = append(tableNames, currName, nextName)
case 2:
nextName := nextIndexDBTableName()
tableNames = append(tableNames, nextName)
default:
// Remove all the tables except the last three tables.
for _, tn := range tableNames[:len(tableNames)-3] {
pathToRemove := filepath.Join(path, tn)
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
fs.MustRemoveAll(pathToRemove)
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
}
fs.MustSyncPath(path)
tableNames = tableNames[len(tableNames)-3:]
}
// Invariant: len(tableNames) >= 2
// Open tables
nextPath := filepath.Join(path, tableNames[2])
currPath := filepath.Join(path, tableNames[1])
prevPath := filepath.Join(path, tableNames[0])
// Remove all the tables except two last tables.
for _, tn := range tableNames[:len(tableNames)-2] {
pathToRemove := filepath.Join(path, tn)
logger.Infof("removing obsolete indexdb dir %q...", pathToRemove)
fs.MustRemoveAll(pathToRemove)
logger.Infof("removed obsolete indexdb dir %q", pathToRemove)
}
next = mustOpenIndexDB(nextPath, s, &s.isReadOnly)
curr = mustOpenIndexDB(currPath, s, &s.isReadOnly)
prev = mustOpenIndexDB(prevPath, s, &s.isReadOnly)
// Persist changes on the file system.
fs.MustSyncPath(path)
// Open the last two tables.
currPath := filepath.Join(path, tableNames[len(tableNames)-1])
curr = mustOpenIndexDB(currPath, s, 0, &s.isReadOnly)
prevPath := filepath.Join(path, tableNames[len(tableNames)-2])
prev = mustOpenIndexDB(prevPath, s, 0, &s.isReadOnly)
return curr, prev
return next, curr, prev
}
var indexDBTableNameRegexp = regexp.MustCompile("^[0-9A-F]{16}$")

View file

@ -90,23 +90,25 @@ func TestDateMetricIDCacheConcurrent(t *testing.T) {
func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
type dmk struct {
date uint64
metricID uint64
generation uint64
date uint64
metricID uint64
}
m := make(map[dmk]bool)
for i := 0; i < 1e5; i++ {
generation := uint64(i) % 2
date := uint64(i) % 3
metricID := uint64(i) % 1237
if !concurrent && c.Has(date, metricID) {
if !m[dmk{date, metricID}] {
return fmt.Errorf("c.Has(%d, %d) must return false, but returned true", date, metricID)
if !concurrent && c.Has(generation, date, metricID) {
if !m[dmk{generation, date, metricID}] {
return fmt.Errorf("c.Has(%d, %d, %d) must return false, but returned true", generation, date, metricID)
}
continue
}
c.Set(date, metricID)
m[dmk{date, metricID}] = true
if !concurrent && !c.Has(date, metricID) {
return fmt.Errorf("c.Has(%d, %d) must return true, but returned false", date, metricID)
c.Set(generation, date, metricID)
m[dmk{generation, date, metricID}] = true
if !concurrent && !c.Has(generation, date, metricID) {
return fmt.Errorf("c.Has(%d, %d, %d) must return true, but returned false", generation, date, metricID)
}
if i%11234 == 0 {
c.mu.Lock()
@ -114,25 +116,29 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
c.mu.Unlock()
}
if i%34323 == 0 {
c.Reset()
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
m = make(map[dmk]bool)
}
}
// Verify fast path after sync.
for i := 0; i < 1e5; i++ {
generation := uint64(i) % 2
date := uint64(i) % 3
metricID := uint64(i) % 123
c.Set(date, metricID)
c.Set(generation, date, metricID)
}
c.mu.Lock()
c.syncLocked()
c.mu.Unlock()
for i := 0; i < 1e5; i++ {
generation := uint64(i) % 2
date := uint64(i) % 3
metricID := uint64(i) % 123
if !concurrent && !c.Has(date, metricID) {
return fmt.Errorf("c.Has(%d, %d) must return true after sync", date, metricID)
if !concurrent && !c.Has(generation, date, metricID) {
return fmt.Errorf("c.Has(%d, %d, %d) must return true after sync", generation, date, metricID)
}
}
@ -140,7 +146,9 @@ func testDateMetricIDCache(c *dateMetricIDCache, concurrent bool) error {
if n := c.EntriesCount(); !concurrent && n < 123 {
return fmt.Errorf("c.EntriesCount must return at least 123; returned %d", n)
}
c.Reset()
c.mu.Lock()
c.resetLocked()
c.mu.Unlock()
if n := c.EntriesCount(); !concurrent && n > 0 {
return fmt.Errorf("c.EntriesCount must return 0 after reset; returned %d", n)
}
@ -493,45 +501,53 @@ func TestMetricRowMarshalUnmarshal(t *testing.T) {
}
}
func TestNextRetentionDuration(t *testing.T) {
validateRetention := func(retention int64) {
func TestNextRetentionDeadlineSeconds(t *testing.T) {
f := func(currentTime string, retention, offset time.Duration, deadlineExpected string) {
t.Helper()
validateRetentionAt := func(now time.Time, retention int64) {
nowMsecs := now.UnixMilli()
d := nextRetentionDurationAt(nowMsecs, retention)
if d <= 0 {
nextTime := now.Add(d)
retentionHuman := time.Duration(retention) * time.Millisecond
t.Errorf("unexpected retention duration for retention=%s; got %s(%s); must be %s + %s; offset: %s", retentionHuman, nextTime, d, now, retentionHuman, time.Duration(retentionTimezoneOffsetMsecs)*time.Millisecond)
}
now, err := time.Parse(time.RFC3339, currentTime)
if err != nil {
t.Fatalf("cannot parse currentTime=%q: %s", currentTime, err)
}
// UTC offsets are in range [-12 hours, +14 hours].
// Verify that any legit combination of retention timezone and local time
// will return valid retention duration.
// See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4207
for retentionOffset := -12; retentionOffset <= 14; retentionOffset++ {
SetRetentionTimezoneOffset(time.Duration(retentionOffset) * time.Hour)
validateRetentionAt(time.Now().UTC(), retention)
now := time.Date(2023, 4, 27, 23, 58, 0, 0, time.UTC)
validateRetentionAt(now, retention)
now = time.Date(2023, 4, 27, 0, 1, 0, 0, time.UTC)
validateRetentionAt(now, retention)
now = time.Date(2023, 4, 27, 0, 0, 0, 0, time.UTC)
validateRetentionAt(now, retention)
d := nextRetentionDeadlineSeconds(now.Unix(), int64(retention.Seconds()), int64(offset.Seconds()))
deadline := time.Unix(d, 0).UTC().Format(time.RFC3339)
if deadline != deadlineExpected {
t.Fatalf("unexpected deadline; got %s; want %s", deadline, deadlineExpected)
}
}
for retentionDays := 0.3; retentionDays < 3; retentionDays += 0.3 {
validateRetention(int64(retentionDays * msecPerDay))
}
f("2023-07-22T12:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-22T03:44:35Z", 24*time.Hour, 0, "2023-07-22T04:00:00Z")
f("2023-07-22T04:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-22T23:44:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
f("2023-07-23T03:59:35Z", 24*time.Hour, 0, "2023-07-23T04:00:00Z")
for retentionMonths := float64(0.1); retentionMonths < 120; retentionMonths += 0.3 {
validateRetention(int64(retentionMonths * msecsPerMonth))
}
f("2023-07-22T12:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T01:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-22T02:00:00Z")
f("2023-07-22T02:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T23:44:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-23T01:59:35Z", 24*time.Hour, 2*time.Hour, "2023-07-23T02:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
f("2023-07-22T08:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-22T09:00:00Z")
f("2023-07-22T09:44:35Z", 24*time.Hour, -5*time.Hour, "2023-07-23T09:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
f("2023-07-22T15:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-22T16:00:00Z")
f("2023-07-22T16:44:35Z", 24*time.Hour, -12*time.Hour, "2023-07-23T16:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
f("2023-07-22T21:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-22T22:00:00Z")
f("2023-07-22T22:44:35Z", 24*time.Hour, -18*time.Hour, "2023-07-23T22:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
f("2023-07-22T09:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-22T10:00:00Z")
f("2023-07-22T10:44:35Z", 24*time.Hour, 18*time.Hour, "2023-07-23T10:00:00Z")
f("2023-07-22T12:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
f("2023-07-22T14:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-22T15:00:00Z")
f("2023-07-22T15:44:35Z", 24*time.Hour, 37*time.Hour, "2023-07-23T15:00:00Z")
}
func TestStorageOpenClose(t *testing.T) {